This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 3267b041 Support background property repair (#712) 3267b041 is described below commit 3267b0418558912ef1a28f48666336d6e7bc2c0d Author: mrproliu <741550...@qq.com> AuthorDate: Mon Aug 4 07:19:53 2025 +0800 Support background property repair (#712) * Support background property repair --- api/proto/banyandb/property/v1/gossip.proto | 5 +- api/proto/banyandb/property/v1/repair.proto | 105 ++++ banyand/property/db.go | 33 +- banyand/property/gossip/client.go | 17 +- banyand/property/gossip/message.go | 12 +- banyand/property/gossip/server.go | 89 ++- banyand/property/gossip/service.go | 62 +- banyand/property/gossip/service_test.go | 19 +- banyand/property/repair.go | 303 ++++++++-- banyand/property/repair_gossip.go | 891 ++++++++++++++++++++++++++++ banyand/property/repair_gossip_test.go | 581 ++++++++++++++++++ banyand/property/repair_test.go | 26 +- banyand/property/service.go | 23 +- banyand/property/shard.go | 32 +- banyand/property/shard_test.go | 20 +- docs/api-reference.md | 217 +++++++ docs/concept/property-repair.md | 25 +- pkg/index/inverted/query.go | 19 + 18 files changed, 2306 insertions(+), 173 deletions(-) diff --git a/api/proto/banyandb/property/v1/gossip.proto b/api/proto/banyandb/property/v1/gossip.proto index 9a23c02c..6bae7523 100644 --- a/api/proto/banyandb/property/v1/gossip.proto +++ b/api/proto/banyandb/property/v1/gossip.proto @@ -30,8 +30,9 @@ message PropagationContext { } message PropagationRequest { - PropagationContext context = 2; - string group = 3; + PropagationContext context = 1; + string group = 2; + uint32 shard_id = 3; } message PropagationResponse {} diff --git a/api/proto/banyandb/property/v1/repair.proto b/api/proto/banyandb/property/v1/repair.proto new file mode 100644 index 00000000..24510ed9 --- /dev/null +++ b/api/proto/banyandb/property/v1/repair.proto @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto3"; + +package banyandb.property.v1; + +import "banyandb/property/v1/property.proto"; + +option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"; +option java_package = "org.apache.skywalking.banyandb.property.v1"; + +message TreeSlotSHA { + int32 slot = 1; + string value = 2; +} + +message TreeLeafNode { + // slot_index is the index of the slot in the tree. + int32 slot_index = 1; + // if the slot is empty, means the server side don't have the slot. + bool exists = 2; + // if the slot and entity exists, the SHA value of the entity. + string entity = 3; + string sha = 4; +} + +message TreeRoot { + string group = 1; + uint32 shard_id = 2; + string root_sha = 3; +} + +message TreeSlots { + repeated TreeSlotSHA slot_sha = 1; +} + +message PropertyMissing { + string entity = 1; +} + +message RootCompare { + bool tree_found = 1; + bool root_sha_match = 2; +} + +message DifferTreeSummary { + // if the nodes is empty, mean the server side don't have more tree leaf nodes to send. + repeated TreeLeafNode nodes = 2; +} + +message PropertySync { + bytes id = 1; + banyandb.property.v1.Property property = 2; + int64 delete_time = 3; +} + +message NoMorePropertySync {} + +message RepairRequest { + oneof data { + // compare stage + TreeRoot tree_root = 1; + TreeSlots tree_slots = 2; + // repair stage + // case 1: client missing but server existing + PropertyMissing property_missing = 3; + // case 2: client existing but server missing + // case 3: SHA value mismatches + PropertySync property_sync = 4; + // if client side is already send all the properties(missing or property sync) + // which means the client side will not sending more properties to sync, server side should close the stream. + NoMorePropertySync no_more_property_sync = 5; + } +} + +message RepairResponse { + oneof data { + // compare stage + RootCompare root_compare = 1; + DifferTreeSummary differ_tree_summary = 2; + // repair stage + // case 1: return from PropertyMissing + // case 3: return if the client is older + PropertySync property_sync = 3; + } +} + +service RepairService { + rpc Repair(stream RepairRequest) returns (stream RepairResponse) {} +} diff --git a/banyand/property/db.go b/banyand/property/db.go index 14a5b1ea..ea5475ab 100644 --- a/banyand/property/db.go +++ b/banyand/property/db.go @@ -32,7 +32,9 @@ import ( "github.com/apache/skywalking-banyandb/api/common" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -48,19 +50,20 @@ var ( ) type database struct { - lock fs.File + metadata metadata.Repo omr observability.MetricsRegistry + lfs fs.FileSystem + lock fs.File logger *logger.Logger repairScheduler *repairScheduler - lfs fs.FileSystem sLst atomic.Pointer[[]*shard] location string repairBaseDir string flushInterval time.Duration expireDelete time.Duration repairTreeSlotCount int - closed atomic.Bool mu sync.RWMutex + closed atomic.Bool } func openDB( @@ -71,9 +74,13 @@ func openDB( repairSlotCount int, omr observability.MetricsRegistry, lfs fs.FileSystem, + repairEnabled bool, repairBaseDir string, repairBuildTreeCron string, repairQuickBuildTreeTime time.Duration, + repairTriggerCron string, + gossipMessenger gossip.Messenger, + metadata metadata.Repo, buildSnapshotFunc func(context.Context) (string, error), ) (*database, error) { loc := filepath.Clean(location) @@ -89,13 +96,18 @@ func openDB( repairTreeSlotCount: repairSlotCount, repairBaseDir: repairBaseDir, lfs: lfs, + metadata: metadata, } + var err error // init repair scheduler - scheduler, err := newRepairScheduler(l, omr, repairBuildTreeCron, repairQuickBuildTreeTime, db, buildSnapshotFunc) - if err != nil { - return nil, errors.Wrapf(err, "failed to create repair scheduler for %s", loc) + if repairEnabled { + scheduler, schedulerErr := newRepairScheduler(l, omr, repairBuildTreeCron, repairQuickBuildTreeTime, repairTriggerCron, + gossipMessenger, repairSlotCount, db, buildSnapshotFunc) + if schedulerErr != nil { + return nil, errors.Wrapf(schedulerErr, "failed to create repair scheduler for %s", loc) + } + db.repairScheduler = scheduler } - db.repairScheduler = scheduler if err = db.load(ctx); err != nil { return nil, err } @@ -211,7 +223,9 @@ func (db *database) close() error { if db.closed.Swap(true) { return nil } - db.repairScheduler.close() + if db.repairScheduler != nil { + db.repairScheduler.close() + } sLst := db.sLst.Load() var err error if sLst != nil { @@ -241,7 +255,8 @@ func (db *database) repair(ctx context.Context, id []byte, shardID uint64, prope if err != nil { return errors.WithMessagef(err, "failed to load shard %d", id) } - return s.repair(ctx, id, property, deleteTime) + _, _, err = s.repair(ctx, id, property, deleteTime) + return err } type walkFn func(suffix string) error diff --git a/banyand/property/gossip/client.go b/banyand/property/gossip/client.go index 3a501d22..41da6e70 100644 --- a/banyand/property/gossip/client.go +++ b/banyand/property/gossip/client.go @@ -26,7 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) -func (s *service) Propagation(nodes []string, group string) error { +func (s *service) Propagation(nodes []string, group string, shardID uint32) error { if len(nodes) < 2 { return fmt.Errorf("must provide at least 2 node") } @@ -45,6 +45,7 @@ func (s *service) Propagation(nodes []string, group string) error { request := &propertyv1.PropagationRequest{ Context: ctx, Group: group, + ShardId: shardID, } var sendTo func(context.Context, *propertyv1.PropagationRequest) (*propertyv1.PropagationResponse, error) @@ -84,6 +85,18 @@ func (s *service) Propagation(nodes []string, group string) error { return nil } +func (s *service) LocateNodes(group string, shardID, replicasCount uint32) ([]string, error) { + result := make([]string, 0, replicasCount) + for r := range replicasCount { + node, err := s.sel.Pick(group, "", shardID, r) + if err != nil { + return nil, fmt.Errorf("failed to locate node for group %s, shardID %d, replica %d: %w", group, shardID, r, err) + } + result = append(result, node) + } + return result, nil +} + func (s *service) getRegisteredNode(id string) (*databasev1.Node, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -100,6 +113,7 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) { s.log.Warn().Msg("invalid metadata type") return } + s.sel.AddNode(node) address := node.PropertyRepairGossipGrpcAddress if address == "" { s.log.Warn().Stringer("node", node).Msg("node does not have gossip address, skipping registration") @@ -132,6 +146,7 @@ func (s *service) OnDelete(md schema.Metadata) { s.log.Warn().Stringer("node", node).Msg("node does not have a name, skipping deregistration") return } + s.sel.RemoveNode(node) s.mu.Lock() defer s.mu.Unlock() diff --git a/banyand/property/gossip/message.go b/banyand/property/gossip/message.go index 4ec1b167..fe26cd1c 100644 --- a/banyand/property/gossip/message.go +++ b/banyand/property/gossip/message.go @@ -20,12 +20,16 @@ package gossip import ( "context" + "github.com/pkg/errors" "google.golang.org/grpc" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/pkg/run" ) +// ErrAbortPropagation is an error that indicates the gossip propagation of a message should be aborted. +var ErrAbortPropagation = errors.New("abort propagation") + // MessageListener is an interface that defines a method to handle the incoming propagation message. type MessageListener interface { Rev(ctx context.Context, nextNode *grpc.ClientConn, request *propertyv1.PropagationRequest) error @@ -48,14 +52,18 @@ type Messenger interface { type MessageClient interface { run.Unit // Propagation using anti-entropy gossip protocol to propagate messages to the specified nodes. - Propagation(nodes []string, topic string) error + Propagation(nodes []string, group string, shardID uint32) error + // LocateNodes finds nodes in the specified group and shard ID, returning a list of node addresses. + LocateNodes(group string, shardID, replicasCount uint32) ([]string, error) } // MessageServer is an interface that defines methods for subscribing to topics and receiving messages in a gossip protocol. type MessageServer interface { run.Unit // Subscribe allows subscribing to a topic to receive messages. - Subscribe(listener MessageListener) error + Subscribe(listener MessageListener) + // RegisterServices registers the gRPC services with the provided server. + RegisterServices(func(r *grpc.Server)) // GetServerPort returns the port number of the server. GetServerPort() *uint32 } diff --git a/banyand/property/gossip/server.go b/banyand/property/gossip/server.go index ff943ada..c7e7ac34 100644 --- a/banyand/property/gossip/server.go +++ b/banyand/property/gossip/server.go @@ -49,13 +49,21 @@ var ( "RetryableStatusCodes": [ "UNAVAILABLE" ] } }]}`, serviceName) + + // perNodeSyncTimeout is the timeout for each node to sync the property data. + perNodeSyncTimeout = time.Minute * 10 ) -func (s *service) Subscribe(listener MessageListener) error { +func (s *service) Subscribe(listener MessageListener) { s.listenersLock.Lock() defer s.listenersLock.Unlock() s.listeners = append(s.listeners, listener) - return nil +} + +func (s *service) RegisterServices(f func(r *grpc.Server)) { + s.serviceRegisterLock.Lock() + defer s.serviceRegisterLock.Unlock() + s.serviceRegister = append(s.serviceRegister, f) } func (s *service) getListener() MessageListener { @@ -67,7 +75,16 @@ func (s *service) getListener() MessageListener { return s.listeners[0] } -type groupPropagation struct { +func (s *service) getServiceRegisters() []func(server *grpc.Server) { + s.serviceRegisterLock.RLock() + defer s.serviceRegisterLock.RUnlock() + if len(s.serviceRegister) == 0 { + return nil + } + return s.serviceRegister +} + +type groupWithShardPropagation struct { latestTime time.Time channel chan *propertyv1.PropagationRequest originalNodeID string @@ -75,17 +92,17 @@ type groupPropagation struct { type protocolHandler struct { propertyv1.UnimplementedGossipServiceServer - s *service - groups map[string]*groupPropagation - groupNotify chan struct{} - mu sync.RWMutex + s *service + groupWithShards map[string]*groupWithShardPropagation + groupNotify chan struct{} + mu sync.RWMutex } func newProtocolHandler(s *service) *protocolHandler { return &protocolHandler{ - s: s, - groups: make(map[string]*groupPropagation), - groupNotify: make(chan struct{}, 10), + s: s, + groupWithShards: make(map[string]*groupWithShardPropagation), + groupNotify: make(chan struct{}, 10), } } @@ -98,7 +115,9 @@ func (q *protocolHandler) processPropagation() { if request == nil { continue } - err := q.handle(q.s.closer.Ctx(), request) + timeoutCtx, cancelFunc := context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout) + err := q.handle(timeoutCtx, request) + cancelFunc() if err != nil { q.s.log.Warn().Err(err).Stringer("request", request). Msgf("handle propagation request failure") @@ -112,7 +131,7 @@ func (q *protocolHandler) processPropagation() { func (q *protocolHandler) findUnProcessRequest() *propertyv1.PropagationRequest { q.mu.Lock() defer q.mu.Unlock() - for _, g := range q.groups { + for _, g := range q.groupWithShards { select { case d := <-g.channel: return d @@ -141,6 +160,7 @@ func (q *protocolHandler) handle(ctx context.Context, request *propertyv1.Propag now := n.UnixNano() nodes := request.Context.Nodes q.s.serverMetrics.totalStarted.Inc(1, request.Group) + q.s.log.Debug().Stringer("request", request).Msgf("handling gossip message for propagation") var needsKeepPropagation bool defer func() { if err != nil { @@ -175,6 +195,11 @@ func (q *protocolHandler) handle(ctx context.Context, request *propertyv1.Propag // process the message using the listener err = listener.Rev(ctx, nextNodeConn, request) if err != nil { + if errors.Is(err, ErrAbortPropagation) { + q.s.log.Warn().Err(err).Msgf("propagation aborted by listener for node: %s", nextNodeID) + _ = nextNodeConn.Close() + return nil // Abort propagation, no need to continue + } q.s.log.Warn().Err(err).Msgf("failed to process with next node: %s", nextNodeID) _ = nextNodeConn.Close() continue @@ -199,6 +224,14 @@ func (q *protocolHandler) handle(ctx context.Context, request *propertyv1.Propag return nil } + if q.contextIsDone(ctx) { + if nextNodeConn != nil { + _ = nextNodeConn.Close() + } + q.s.log.Debug().Msgf("context is done, no need to propagate further") + return nil + } + // propagate the message to the next node needsKeepPropagation = true q.s.serverMetrics.totalSendToNextStarted.Inc(1, request.Group) @@ -217,42 +250,52 @@ func (q *protocolHandler) handle(ctx context.Context, request *propertyv1.Propag return nil } +func (q *protocolHandler) contextIsDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest) bool { q.mu.Lock() defer q.mu.Unlock() - group, exist := q.groups[request.Group] + shardKey := fmt.Sprintf("%s_%d", request.Group, request.ShardId) + groupShard, exist := q.groupWithShards[shardKey] if !exist { - group = &groupPropagation{ + groupShard = &groupWithShardPropagation{ channel: make(chan *propertyv1.PropagationRequest, 1), originalNodeID: request.Context.OriginNode, latestTime: time.Now(), } - group.channel <- request - q.groups[request.Group] = group + groupShard.channel <- request + q.groupWithShards[shardKey] = groupShard q.notifyNewRequest() return true } // if the latest round is out of ttl, then needs to change to current node to executing - if time.Since(group.latestTime) > q.s.scheduleInterval/2 { - group.originalNodeID = request.Context.OriginNode + if time.Since(groupShard.latestTime) > q.s.scheduleInterval/2 { + groupShard.originalNodeID = request.Context.OriginNode select { - case group.channel <- request: + case groupShard.channel <- request: q.notifyNewRequest() default: - q.s.log.Error().Msgf("ready to added propagation into group %s in a new round, but it's full", request.Group) + q.s.log.Error().Msgf("ready to added propagation into group shard %s(%d) in a new round, but it's full", request.Group, request.ShardId) } return true } // if the original node ID are a same node, means which from the same round - if group.originalNodeID == request.Context.OriginNode { + if groupShard.originalNodeID == request.Context.OriginNode { select { - case group.channel <- request: + case groupShard.channel <- request: q.notifyNewRequest() default: - q.s.log.Error().Msgf("ready to added propagation into group %s in a same round, but it's full", request.Group) + q.s.log.Error().Msgf("ready to added propagation into group shard %s(%d) in a same round, but it's full", request.Group, request.ShardId) } return true } diff --git a/banyand/property/gossip/service.go b/banyand/property/gossip/service.go index a52319ee..625b5ff7 100644 --- a/banyand/property/gossip/service.go +++ b/banyand/property/gossip/service.go @@ -41,6 +41,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/node" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -60,31 +61,32 @@ const ( type service struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - creds credentials.TransportCredentials - omr observability.MetricsRegistry - registered map[string]*databasev1.Node - ser *grpclib.Server - serverMetrics *serverMetrics - log *logger.Logger - closer *run.Closer - protocolHandler *protocolHandler - keyFile string - addr string - certFile string - host string - nodeID string - caCertPath string - listeners []MessageListener - maxRecvMsgSize run.Bytes - listenersLock sync.RWMutex - mu sync.RWMutex - port uint32 - tls bool - - totalTimeout time.Duration - // TODO: should pass by property repair configuration - scheduleInterval time.Duration + metadata metadata.Repo + creds credentials.TransportCredentials + omr observability.MetricsRegistry + sel node.Selector + registered map[string]*databasev1.Node + ser *grpclib.Server + serverMetrics *serverMetrics + log *logger.Logger + closer *run.Closer + protocolHandler *protocolHandler + certFile string + keyFile string + host string + nodeID string + caCertPath string + addr string + listeners []MessageListener + serviceRegister []func(s *grpclib.Server) + maxRecvMsgSize run.Bytes + totalTimeout time.Duration + scheduleInterval time.Duration + serviceRegisterLock sync.RWMutex + mu sync.RWMutex + listenersLock sync.RWMutex + port uint32 + tls bool } // NewMessenger creates a new instance of Messenger for gossip propagation communication between nodes. @@ -99,12 +101,15 @@ func NewMessenger(omr observability.MetricsRegistry, metadata metadata.Repo) Mes listeners: make([]MessageListener, 0), registered: make(map[string]*databasev1.Node), scheduleInterval: time.Hour * 2, + sel: node.NewRoundRobinSelector("", metadata), } } // NewMessengerWithoutMetadata creates a new instance of Messenger without metadata. -func NewMessengerWithoutMetadata(omr observability.MetricsRegistry) Messenger { - return NewMessenger(omr, nil) +func NewMessengerWithoutMetadata(omr observability.MetricsRegistry, port int) Messenger { + messenger := NewMessenger(omr, nil) + messenger.(*service).port = uint32(port) + return messenger } func (s *service) PreRun(ctx context.Context) error { @@ -207,6 +212,9 @@ func (s *service) Serve(stopCh chan struct{}) { s.ser = grpclib.NewServer(opts...) propertyv1.RegisterGossipServiceServer(s.ser, s.protocolHandler) + for _, register := range s.getServiceRegisters() { + register(s.ser) + } var wg sync.WaitGroup wg.Add(1) go func() { diff --git a/banyand/property/gossip/service_test.go b/banyand/property/gossip/service_test.go index a36f3a38..82fb4259 100644 --- a/banyand/property/gossip/service_test.go +++ b/banyand/property/gossip/service_test.go @@ -68,7 +68,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodes = startNodes(1) node := nodes[0] - err := node.messenger.Propagation([]string{node.nodeID}, "test") + err := node.messenger.Propagation([]string{node.nodeID}, "test", 0) gomega.Expect(err).To(gomega.HaveOccurred()) }) @@ -76,7 +76,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodes = startNodes(2) node1, node2 := nodes[0], nodes[1] - err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID}, mockGroup) + err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeVerify(node1, []string{node2.nodeID}, 1) }) @@ -85,7 +85,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodes = startNodes(2) node1, node2 := nodes[0], nodes[1] - err := node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID}, mockGroup) + err := node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeVerify(node1, []string{node2.nodeID}, 1) nodeVerify(node2, []string{}, 0) @@ -95,7 +95,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodes = startNodes(2) node1, node2 := nodes[0], nodes[1] - err := node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID, "no-existing"}, mockGroup) + err := node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID, "no-existing"}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeVerify(node1, []string{node2.nodeID, node2.nodeID}, 2) nodeVerify(node2, []string{node1.nodeID}, 1) @@ -105,7 +105,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { nodes = startNodes(3) node1, node2, node3 := nodes[0], nodes[1], nodes[2] - err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup) + err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeVerify(node1, []string{node2.nodeID}, 1) nodeVerify(node2, []string{node3.nodeID}, 1) @@ -117,7 +117,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { node1, node2, node3 := nodes[0], nodes[1], nodes[2] node3.listener.mockErr = fmt.Errorf("mock error") - err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup) + err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) nodeVerify(node1, []string{node2.nodeID}, 1) nodeVerify(node2, []string{node3.nodeID}, 1) @@ -129,9 +129,9 @@ var _ = ginkgo.Describe("Propagation Messenger", func() { node1, node2, node3 := nodes[0], nodes[1], nodes[2] node1.listener.delay = time.Second - err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup) + err := node1.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup) + err = node2.messenger.Propagation([]string{node1.nodeID, node2.nodeID, node3.nodeID}, mockGroup, 0) gomega.Expect(err).NotTo(gomega.HaveOccurred()) // should only the first propagation execute, the second one is ignored @@ -183,9 +183,8 @@ func startNodes(count int) []*nodeContext { gomega.Expect(err).NotTo(gomega.HaveOccurred()) // starting gossip messenger - messenger := NewMessengerWithoutMetadata(observability.NewBypassRegistry()) + messenger := NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0]) gomega.Expect(messenger).NotTo(gomega.BeNil()) - messenger.(*service).port = uint32(ports[0]) addr := fmt.Sprintf("127.0.0.1:%d", ports[0]) messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(), common.ContextNodeKey, common.Node{ NodeID: addr, diff --git a/banyand/property/repair.go b/banyand/property/repair.go index 3083249e..d06cb0cd 100644 --- a/banyand/property/repair.go +++ b/banyand/property/repair.go @@ -26,11 +26,13 @@ import ( "fmt" "hash" "io" + "math/rand/v2" "os" "path" "path/filepath" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -42,10 +44,14 @@ import ( "github.com/pkg/errors" "github.com/robfig/cron/v3" "go.uber.org/multierr" + grpclib "google.golang.org/grpc" "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -119,7 +125,7 @@ func (r *repair) checkHasUpdates() (bool, error) { return true, nil } -func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err error) { +func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group string) (err error) { startTime := time.Now() defer func() { r.metrics.totalBuildTreeFinished.Inc(1) @@ -136,10 +142,15 @@ func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err erro sort.Sort(snapshotIDList(items)) blugeConf := bluge.DefaultConfig(snapshotPath) - err = r.buildTree(ctx, blugeConf) + err = r.buildTree(ctx, blugeConf, group) if err != nil { return fmt.Errorf("building trees failure: %w", err) } + // if only update a specific group, the repair base status file doesn't need to update + // because not all the group have been processed + if group != "" { + return nil + } var latestSnapshotID uint64 if len(items) > 0 { @@ -154,6 +165,9 @@ func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err erro if err != nil { return fmt.Errorf("marshall state failure: %w", err) } + if err = os.MkdirAll(filepath.Dir(r.statePath), storage.DirPerm); err != nil { + return fmt.Errorf("creating state directory failure: %w", err) + } err = os.WriteFile(r.statePath, stateVal, storage.FilePerm) if err != nil { return fmt.Errorf("writing state file failure: %w", err) @@ -161,15 +175,23 @@ func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err erro return nil } -func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error { +func (r *repair) buildTree(ctx context.Context, conf bluge.Config, group string) error { reader, err := bluge.OpenReader(conf) if err != nil { + // means no data found + if strings.Contains(err.Error(), "unable to find a usable snapshot") { + return nil + } return fmt.Errorf("opening index reader failure: %w", err) } defer func() { _ = reader.Close() }() - topNSearch := bluge.NewTopNSearch(r.batchSearchSize, bluge.NewMatchAllQuery()) + query := bluge.Query(bluge.NewMatchAllQuery()) + if group != "" { + query = bluge.NewTermQuery(group).SetField(groupField) + } + topNSearch := bluge.NewTopNSearch(r.batchSearchSize, query) topNSearch.SortBy([]string{ fmt.Sprintf("+%s", groupField), fmt.Sprintf("+%s", nameField), @@ -185,7 +207,7 @@ func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error { } group := convert.BytesToString(sortValue[0]) name := convert.BytesToString(sortValue[1]) - entity := fmt.Sprintf("%s/%s/%s", group, name, convert.BytesToString(sortValue[2])) + entity := r.buildLeafNodeEntity(group, name, convert.BytesToString(sortValue[2])) s := newSearchingProperty(group, shaValue, entity) if latestProperty != nil { @@ -226,6 +248,18 @@ func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error { return nil } +func (r *repair) buildLeafNodeEntity(group, name, entityID string) string { + return fmt.Sprintf("%s/%s/%s", group, name, entityID) +} + +func (r *repair) parseLeafNodeEntity(entity string) (string, string, string, error) { + parts := strings.SplitN(entity, "/", 3) + if len(parts) != 3 { + return "", "", "", fmt.Errorf("invalid leaf node entity format: %s", entity) + } + return parts[0], parts[1], parts[2], nil +} + func (r *repair) pageSearch( ctx context.Context, reader *bluge.Reader, @@ -318,19 +352,28 @@ const ( type repairTreeNode struct { shaValue string - id string + entity string tp repairTreeNodeType leafStart int64 leafCount int64 + slotInx int32 } -type repairTreeReader struct { + +type repairTreeReader interface { + read(parent *repairTreeNode, pagingSize int64, forceReadFromStart bool) ([]*repairTreeNode, error) + close() error +} + +type repairTreeFileReader struct { file *os.File reader *bufio.Reader footer *repairTreeFooter paging *repairTreeReaderPage } -func (r *repair) treeReader(group string) (*repairTreeReader, error) { +func (r *repair) treeReader(group string) (repairTreeReader, error) { + r.scheduler.treeLocker.RLock() + defer r.scheduler.treeLocker.RUnlock() groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group) file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm) if err != nil { @@ -340,7 +383,7 @@ func (r *repair) treeReader(group string) (*repairTreeReader, error) { } return nil, fmt.Errorf("opening repair tree file %s failure: %w", group, err) } - reader := &repairTreeReader{ + reader := &repairTreeFileReader{ file: file, reader: bufio.NewReader(file), } @@ -351,7 +394,18 @@ func (r *repair) treeReader(group string) (*repairTreeReader, error) { return reader, nil } -func (r *repairTreeReader) readFoot() error { +func (r *repair) stateFileExist() (bool, error) { + _, err := os.Stat(r.statePath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, fmt.Errorf("checking state file existence failure: %w", err) + } + return true, nil +} + +func (r *repairTreeFileReader) readFoot() error { stat, err := r.file.Stat() if err != nil { return fmt.Errorf("getting file stat for %s failure: %w", r.file.Name(), err) @@ -386,7 +440,7 @@ func (r *repairTreeReader) readFoot() error { return nil } -func (r *repairTreeReader) seekPosition(offset int64, whence int) error { +func (r *repairTreeFileReader) seekPosition(offset int64, whence int) error { _, err := r.file.Seek(offset, whence) if err != nil { return fmt.Errorf("seeking position failure: %w", err) @@ -395,7 +449,7 @@ func (r *repairTreeReader) seekPosition(offset int64, whence int) error { return nil } -func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) ([]*repairTreeNode, error) { +func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, forceReFromStart bool) ([]*repairTreeNode, error) { if parent == nil { // reading the root node err := r.seekPosition(r.footer.slotNodeFinishedOffset, io.SeekStart) @@ -421,7 +475,7 @@ func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) ([]*re var err error if parent.tp == repairTreeNodeTypeRoot { needSeek := false - if r.paging == nil || r.paging.lastNode != parent { + if r.paging == nil || r.paging.lastNode != parent || forceReFromStart { needSeek = true r.paging = newRepairTreeReaderPage(parent, r.footer.slotNodeCount) } @@ -458,7 +512,7 @@ func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) ([]*re } nodes = append(nodes, &repairTreeNode{ shaValue: string(slotShaVal), - id: fmt.Sprintf("%d", slotNodeIndex), + slotInx: int32(slotNodeIndex), tp: repairTreeNodeTypeSlot, leafStart: leafStartOff, leafCount: leafCount, @@ -472,9 +526,9 @@ func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) ([]*re // otherwise, reading the leaf nodes needSeek := false - if r.paging == nil || r.paging.lastNode != parent { + if r.paging == nil || r.paging.lastNode != parent || forceReFromStart { needSeek = true - r.paging = newRepairTreeReaderPage(parent, r.footer.slotNodeCount) + r.paging = newRepairTreeReaderPage(parent, parent.leafCount) } if needSeek { err = r.seekPosition(parent.leafStart, io.SeekStart) @@ -499,15 +553,16 @@ func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) ([]*re return nil, fmt.Errorf("decoding leaf node sha value from file %s failure: %w", r.file.Name(), err) } nodes = append(nodes, &repairTreeNode{ + slotInx: parent.slotInx, shaValue: string(shaVal), - id: string(entity), + entity: string(entity), tp: repairTreeNodeTypeLeaf, }) } return nodes, nil } -func (r *repairTreeReader) close() error { +func (r *repairTreeFileReader) close() error { return r.file.Close() } @@ -528,13 +583,65 @@ func (r *repairTreeReaderPage) nextPage(count int64) int64 { return 0 } readCount := r.reduceCount - count - if readCount < 0 { + if readCount <= 0 { readCount = r.reduceCount } r.reduceCount -= readCount return readCount } +type repairBufferLeafReader struct { + reader repairTreeReader + currentSlot *repairTreeNode + pagingLeafs []*repairTreeNode +} + +func newRepairBufferLeafReader(reader repairTreeReader) *repairBufferLeafReader { + return &repairBufferLeafReader{ + reader: reader, + } +} + +func (r *repairBufferLeafReader) next(slot *repairTreeNode) (*repairTreeNode, error) { + // slot is nil means reset the reader + if slot == nil { + r.currentSlot = nil + return nil, nil + } + if r.currentSlot == nil || r.currentSlot.slotInx != slot.slotInx { + // if the current slot is nil or the slot index is changed, we need to read the leaf nodes from the slot + err := r.readNodes(slot, true) + if err != nil { + return nil, err + } + } + // if no more leaf nodes, trying to read slots + if len(r.pagingLeafs) == 0 { + err := r.readNodes(slot, false) + if err != nil { + return nil, err + } + // no more leaf nodes to read, return nil + if len(r.pagingLeafs) == 0 { + return nil, nil + } + } + // pop the first leaf node from the paging leafs + leaf := r.pagingLeafs[0] + r.pagingLeafs = r.pagingLeafs[1:] + return leaf, nil +} + +func (r *repairBufferLeafReader) readNodes(slot *repairTreeNode, forceReadFromStart bool) error { + nodes, err := r.reader.read(slot, repairBatchSearchSize, forceReadFromStart) + if err != nil { + return fmt.Errorf("reading leaf nodes from slot %d failure: %w", slot.slotInx, err) + } + r.pagingLeafs = nodes + r.currentSlot = slot + return nil +} + type repairTreeFooter struct { leafNodeFinishedOffset int64 slotNodeCount int64 @@ -824,24 +931,30 @@ func newRepairMetrics(fac *observability.Factory) *repairMetrics { type repairScheduler struct { latestBuildTreeSchedule time.Time buildTreeClock clock.Clock - l *logger.Logger + gossipMessenger gossip.Messenger closer *run.Closer buildSnapshotFunc func(context.Context) (string, error) repairTreeScheduler *timestamp.Scheduler quickRepairNotified *int32 db *database + l *logger.Logger metrics *repairSchedulerMetrics + treeSlotCount int buildTreeScheduleInterval time.Duration quickBuildTreeTime time.Duration lastBuildTimeLocker sync.Mutex - buildTreeLocker sync.Mutex + treeLocker sync.RWMutex } +// nolint: contextcheck func newRepairScheduler( l *logger.Logger, omr observability.MetricsRegistry, - cronExp string, + buildTreeCronExp string, quickBuildTreeTime time.Duration, + triggerCronExp string, + gossipMessenger gossip.Messenger, + treeSlotCount int, db *database, buildSnapshotFunc func(context.Context) (string, error), ) (*repairScheduler, error) { @@ -855,16 +968,29 @@ func newRepairScheduler( closer: run.NewCloser(1), quickBuildTreeTime: quickBuildTreeTime, metrics: newRepairSchedulerMetrics(omr.With(propertyScope.SubScope("scheduler"))), + gossipMessenger: gossipMessenger, + treeSlotCount: treeSlotCount, } c := timestamp.NewScheduler(l, s.buildTreeClock) s.repairTreeScheduler = c - err := c.Register("repair", cron.Descriptor, cronExp, func(t time.Time, _ *logger.Logger) bool { - s.doRepairScheduler(t, true) + err := c.Register("build-tree", cron.Descriptor, buildTreeCronExp, func(t time.Time, _ *logger.Logger) bool { + s.doBuildTreeScheduler(t, true) return true }) if err != nil { return nil, fmt.Errorf("failed to add repair build tree cron task: %w", err) } + err = c.Register("trigger", cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor, + triggerCronExp, func(time.Time, *logger.Logger) bool { + gossipErr := s.doRepairGossip(s.closer.Ctx()) + if gossipErr != nil { + s.l.Err(fmt.Errorf("repair gossip failure: %w", gossipErr)) + } + return true + }) + if err != nil { + return nil, fmt.Errorf("failed to add repair trigger cron task: %w", err) + } err = s.initializeInterval() if err != nil { return nil, err @@ -872,18 +998,18 @@ func newRepairScheduler( return s, nil } -func (r *repairScheduler) doRepairScheduler(t time.Time, triggerByCron bool) { - if !r.verifyShouldExecute(t, triggerByCron) { +func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron bool) { + if !r.verifyShouldExecuteBuildTree(t, triggerByCron) { return } - err := r.doRepair() + err := r.doBuildTree() if err != nil { r.l.Err(fmt.Errorf("repair build status failure: %w", err)) } } -func (r *repairScheduler) verifyShouldExecute(t time.Time, triggerByCron bool) bool { +func (r *repairScheduler) verifyShouldExecuteBuildTree(t time.Time, triggerByCron bool) bool { r.lastBuildTimeLocker.Lock() defer r.lastBuildTimeLocker.Unlock() if !triggerByCron { @@ -900,7 +1026,7 @@ func (r *repairScheduler) verifyShouldExecute(t time.Time, triggerByCron bool) b func (r *repairScheduler) initializeInterval() error { r.lastBuildTimeLocker.Lock() defer r.lastBuildTimeLocker.Unlock() - interval, nextTime, exist := r.repairTreeScheduler.Interval("repair") + interval, nextTime, exist := r.repairTreeScheduler.Interval("build-tree") if !exist { return fmt.Errorf("failed to get repair build tree cron task interval") } @@ -910,7 +1036,7 @@ func (r *repairScheduler) initializeInterval() error { } //nolint:contextcheck -func (r *repairScheduler) doRepair() (err error) { +func (r *repairScheduler) doBuildTree() (err error) { now := time.Now() r.metrics.totalRepairBuildTreeStarted.Inc(1) defer func() { @@ -920,11 +1046,6 @@ func (r *repairScheduler) doRepair() (err error) { r.metrics.totalRepairBuildTreeFailures.Inc(1) } }() - if !r.buildTreeLocker.TryLock() { - return nil - } - defer r.buildTreeLocker.Unlock() - // check each shard have any updates sLst := r.db.sLst.Load() if sLst == nil { return nil @@ -945,6 +1066,22 @@ func (r *repairScheduler) doRepair() (err error) { } // otherwise, we need to build the trees + return r.buildingTree(nil, "", false) +} + +// nolint: contextcheck +func (r *repairScheduler) buildingTree(shards []common.ShardID, group string, force bool) error { + if force { + r.treeLocker.Lock() + } else if !r.treeLocker.TryLock() { + // if not forced, we try to lock the build tree locker + r.metrics.totalRepairBuildTreeConflicts.Inc(1) + return nil + } + defer r.treeLocker.Unlock() + + buildAll := len(shards) == 0 + // take a new snapshot first snapshotPath, err := r.buildSnapshotFunc(r.closer.Ctx()) if err != nil { @@ -955,11 +1092,24 @@ func (r *repairScheduler) doRepair() (err error) { if err != nil { return err } + if !buildAll { + // if not building all shards, check if the shard is in the list + found := false + for _, s := range shards { + if s == common.ShardID(id) { + found = true + break + } + } + if !found { + return nil // skip this shard + } + } s, err := r.db.loadShard(r.closer.Ctx(), common.ShardID(id)) if err != nil { return fmt.Errorf("loading shard %d failure: %w", id, err) } - err = s.repairState.buildStatus(r.closer.Ctx(), path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix))) + err = s.repairState.buildStatus(r.closer.Ctx(), path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix)), group) if err != nil { return fmt.Errorf("building status for shard %d failure: %w", id, err) } @@ -977,7 +1127,7 @@ func (r *repairScheduler) documentUpdatesNotify() { case <-r.closer.CloseNotify(): return case <-time.After(r.quickBuildTreeTime): - r.doRepairScheduler(r.buildTreeClock.Now(), false) + r.doBuildTreeScheduler(r.buildTreeClock.Now(), false) // reset the notified flag to allow the next notification atomic.StoreInt32(r.quickRepairNotified, 0) } @@ -990,18 +1140,81 @@ func (r *repairScheduler) close() { r.closer.CloseThenWait() } +func (r *repairScheduler) doRepairGossip(ctx context.Context) error { + group, shardNum, err := r.randomSelectGroup(ctx) + if err != nil { + return fmt.Errorf("selecting random group failure: %w", err) + } + + nodes, err := r.gossipMessenger.LocateNodes(group.Metadata.Name, shardNum, uint32(r.copiesCount(group))) + if err != nil { + return fmt.Errorf("locating nodes for group %s, shard %d failure: %w", group.Metadata.Name, shardNum, err) + } + return r.gossipMessenger.Propagation(nodes, group.Metadata.Name, shardNum) +} + +func (r *repairScheduler) randomSelectGroup(ctx context.Context) (*commonv1.Group, uint32, error) { + allGroups, err := r.db.metadata.GroupRegistry().ListGroup(ctx) + if err != nil { + return nil, 0, fmt.Errorf("listing groups failure: %w", err) + } + + groups := make([]*commonv1.Group, 0) + for _, group := range allGroups { + if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY { + continue + } + // if the group don't have copies, skip it + if r.copiesCount(group) < 2 { + continue + } + groups = append(groups, group) + } + + if len(groups) == 0 { + return nil, 0, fmt.Errorf("no groups found with enough copies for repair") + } + // #nosec G404 -- not security-critical, just for random selection + group := groups[rand.Int64()%int64(len(groups))] + // #nosec G404 -- not security-critical, just for random selection + return group, rand.Uint32() % group.ResourceOpts.ShardNum, nil +} + +func (r *repairScheduler) copiesCount(group *commonv1.Group) int { + return int(group.ResourceOpts.Replicas + 1) +} + +func (r *repairScheduler) registerServerToGossip() func(server *grpclib.Server) { + s := newRepairGossipServer(r) + return func(server *grpclib.Server) { + propertyv1.RegisterRepairServiceServer(server, s) + } +} + +func (r *repairScheduler) registerClientToGossip(messenger gossip.Messenger) { + messenger.Subscribe(newRepairGossipClient(r)) +} + type repairSchedulerMetrics struct { - totalRepairBuildTreeStarted meter.Counter - totalRepairBuildTreeFinished meter.Counter - totalRepairBuildTreeFailures meter.Counter - totalRepairBuildTreeLatency meter.Counter + totalRepairBuildTreeStarted meter.Counter + totalRepairBuildTreeFinished meter.Counter + totalRepairBuildTreeFailures meter.Counter + totalRepairBuildTreeLatency meter.Counter + totalRepairBuildTreeConflicts meter.Counter + + totalRepairSuccessCount meter.Counter + totalRepairFailedCount meter.Counter } func newRepairSchedulerMetrics(omr *observability.Factory) *repairSchedulerMetrics { return &repairSchedulerMetrics{ - totalRepairBuildTreeStarted: omr.NewCounter("repair_build_tree_started"), - totalRepairBuildTreeFinished: omr.NewCounter("repair_build_tree_finished"), - totalRepairBuildTreeFailures: omr.NewCounter("repair_build_tree_failures"), - totalRepairBuildTreeLatency: omr.NewCounter("repair_build_tree_latency"), + totalRepairBuildTreeStarted: omr.NewCounter("repair_build_tree_started"), + totalRepairBuildTreeFinished: omr.NewCounter("repair_build_tree_finished"), + totalRepairBuildTreeFailures: omr.NewCounter("repair_build_tree_failures"), + totalRepairBuildTreeLatency: omr.NewCounter("repair_build_tree_latency"), + totalRepairBuildTreeConflicts: omr.NewCounter("repair_build_tree_conflicts"), + + totalRepairSuccessCount: omr.NewCounter("property_repair_success_count", "group", "shard"), + totalRepairFailedCount: omr.NewCounter("property_repair_failure_count", "group", "shard"), } } diff --git a/banyand/property/repair_gossip.go b/banyand/property/repair_gossip.go new file mode 100644 index 00000000..91d11e4a --- /dev/null +++ b/banyand/property/repair_gossip.go @@ -0,0 +1,891 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/pkg/errors" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" +) + +var ( + gossipMerkleTreeReadPageSize int64 = 10 + gossipShardQueryDatabaseSize = 100 +) + +type repairGossipBase struct { + scheduler *repairScheduler +} + +func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shardID uint32) (repairTreeReader, bool, error) { + s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID)) + if err != nil { + return nil, false, fmt.Errorf("failed to load shard %d: %w", shardID, err) + } + tree, err := s.repairState.treeReader(group) + if err != nil { + return nil, false, fmt.Errorf("failed to get tree reader for group %s: %w", group, err) + } + if tree == nil { + // if the tree is nil, but the state file exist, means the tree(group) is empty + stateExist, err := s.repairState.stateFileExist() + if err != nil { + return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) + } + // if the tree is nil, it means the tree is no data + return &emptyRepairTreeReader{}, stateExist, nil + } + return tree, true, nil +} + +func (b *repairGossipBase) sendTreeSummary( + reader repairTreeReader, + group string, + shardID uint32, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], +) (root *repairTreeNode, rootMatches bool, err error) { + roots, err := reader.read(nil, 1, false) + if err != nil { + return nil, false, fmt.Errorf("failed to read tree root: %w", err) + } + if len(roots) == 0 { + return nil, false, fmt.Errorf("tree root is empty for group %s", group) + } + + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeRoot{ + TreeRoot: &propertyv1.TreeRoot{ + Group: group, + ShardId: shardID, + RootSha: roots[0].shaValue, + }, + }, + }) + if err != nil { + return nil, false, fmt.Errorf("failed to send tree root for group %s: %w", group, err) + } + + recv, err := stream.Recv() + if err != nil { + return nil, false, fmt.Errorf("failed to receive tree summary response for group %s: %w", group, err) + } + rootCompare, ok := recv.Data.(*propertyv1.RepairResponse_RootCompare) + if !ok { + return nil, false, fmt.Errorf("unexpected response type: %T, expected RootCompare", recv.Data) + } + if !rootCompare.RootCompare.TreeFound { + return nil, false, fmt.Errorf("server tree not found for group: %s", group) + } + if rootCompare.RootCompare.RootShaMatch { + return roots[0], true, nil + } + + var slots []*repairTreeNode + for { + slots, err = reader.read(roots[0], gossipMerkleTreeReadPageSize, false) + if err != nil { + return nil, false, fmt.Errorf("failed to read slots for group %s: %w", group, err) + } + if len(slots) == 0 { + break + } + + slotReq := &propertyv1.TreeSlots{ + SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)), + } + for _, s := range slots { + slotReq.SlotSha = append(slotReq.SlotSha, &propertyv1.TreeSlotSHA{ + Slot: s.slotInx, + Value: s.shaValue, + }) + } + + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSlots{ + TreeSlots: slotReq, + }, + }) + if err != nil { + return nil, false, fmt.Errorf("failed to send tree slots for group %s: %w", group, err) + } + } + + // send the empty slots list means there are no more slots to send + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_TreeSlots{ + TreeSlots: &propertyv1.TreeSlots{ + SlotSha: []*propertyv1.TreeSlotSHA{}, + }, + }, + }) + if err != nil { + return nil, false, fmt.Errorf("failed to send empty tree slots for group %s: %w", group, err) + } + + return roots[0], false, nil +} + +func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard *shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) { + g, n, entity, err := syncShard.repairState.parseLeafNodeEntity(leafNodeEntity) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse leaf node entity %s: %w", leafNodeEntity, err) + } + searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, g, n, entityID, entity) + if err != nil { + return nil, nil, fmt.Errorf("failed to build query from leaf node entity %s: %w", leafNodeEntity, err) + } + queriedProperties, err := syncShard.search(ctx, searchQuery, gossipShardQueryDatabaseSize) + if err != nil { + return nil, nil, fmt.Errorf("failed to search properties for leaf node entity %s: %w", leafNodeEntity, err) + } + var latestProperty *queryProperty + for _, queried := range queriedProperties { + if latestProperty == nil || queried.timestamp > latestProperty.timestamp { + latestProperty = queried + } + } + if latestProperty == nil { + return nil, nil, nil + } + var p propertyv1.Property + err = protojson.Unmarshal(latestProperty.source, &p) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal property from leaf node entity %s: %w", leafNodeEntity, err) + } + return latestProperty, &p, nil +} + +type repairGossipClient struct { + repairGossipBase +} + +func newRepairGossipClient(s *repairScheduler) *repairGossipClient { + return &repairGossipClient{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipClient) Rev(ctx context.Context, nextNode *grpclib.ClientConn, request *propertyv1.PropagationRequest) error { + client := propertyv1.NewRepairServiceClient(nextNode) + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err := r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, request.Group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to rebuild tree for group %s, shard %d", request.Group, request.ShardId) + } + } + }() + reader, found, err := r.getTreeReader(ctx, request.Group, request.ShardId) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get tree reader on client side: %v", err) + } + if !found { + return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group %s, shard %d not found on client side", request.Group, request.ShardId) + } + defer reader.close() + + stream, err := client.Repair(ctx) + if err != nil { + return fmt.Errorf("failed to create repair stream: %w", err) + } + defer func() { + _ = stream.CloseSend() + }() + + // step 1: send merkle tree data + rootNode, rootMatch, err := r.sendTreeSummary(reader, request.Group, request.ShardId, stream) + if err != nil { + // if the tree summary cannot be built, we should abort the propagation + return errors.Wrapf(gossip.ErrAbortPropagation, "failed to query/send tree summary on client side: %v", err) + } + // if the root node matched, then ignore the repair + if rootMatch { + return nil + } + + syncShard, err := r.scheduler.db.loadShard(ctx, common.ShardID(request.ShardId)) + if err != nil { + return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load failure on client side: %v", request.ShardId, err) + } + firstTreeSummaryResp := true + + leafReader := newRepairBufferLeafReader(reader) + var currentComparingClientNode *repairTreeNode + var notProcessingClientNode *repairTreeNode + for { + recvResp, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("failed to keep receive tree summary from server: %w", err) + } + + switch resp := recvResp.Data.(type) { + case *propertyv1.RepairResponse_DifferTreeSummary: + // step 2: check with the server for different leaf nodes + // there no different nodes, we can skip repair + if firstTreeSummaryResp && len(resp.DifferTreeSummary.Nodes) == 0 { + return nil + } + r.handleDifferSummaryFromServer(ctx, stream, resp.DifferTreeSummary, reader, syncShard, rootNode, leafReader, ¬ProcessingClientNode, ¤tComparingClientNode) + firstTreeSummaryResp = false + case *propertyv1.RepairResponse_PropertySync: + // step 3: keep receiving messages from the server + // if the server still sending different nodes, we should keep reading them + // if the server sends a PropertySync, we should repair the property and send the newer property back to the server if needed + sync := resp.PropertySync + updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Id) + r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + } + if updated { + r.scheduler.l.Debug().Msgf("successfully repaired property %s on client side", sync.Id) + r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) + hasPropertyUpdated = true + continue + } + // if the property hasn't been updated, and the newer property is not nil, + // which means the property is newer than the server side, + if !updated && newer != nil { + var p propertyv1.Property + err = protojson.Unmarshal(newer.source, &p) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal property from db by entity %s", newer.id) + continue + } + // send the newer property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: newer.id, + Property: &p, + DeleteTime: newer.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to server, entity: %s", newer.id) + } + } + default: + r.scheduler.l.Warn().Msgf("unexpected response type: %T, expected DifferTreeSummary or PropertySync", resp) + } + } +} + +func (r *repairGossipClient) sendPropertyMissing(stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], entity string) { + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertyMissing{ + PropertyMissing: &propertyv1.PropertyMissing{ + Entity: entity, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property missing response to client, entity: %s", entity) + } +} + +func (r *repairGossipClient) handleDifferSummaryFromServer( + ctx context.Context, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], + differTreeSummary *propertyv1.DifferTreeSummary, + reader repairTreeReader, + syncShard *shard, + rootNode *repairTreeNode, + bufSlotReader *repairBufferLeafReader, + notProcessingClientNode **repairTreeNode, + currentComparingClientNode **repairTreeNode, +) { + // if their no more different nodes, means the client side could be send the no more property sync request to notify the server + if len(differTreeSummary.Nodes) == 0 { + // if the current comparing client nodes still not empty, means the client side has leaf nodes that are not processed yet + // then queried and sent property to server + if *currentComparingClientNode != nil { + // reading all reduced properties from the client side, and send to the server + r.readingReduceLeafAndSendProperties(ctx, syncShard, stream, bufSlotReader, *currentComparingClientNode) + *currentComparingClientNode = nil + } + if *notProcessingClientNode != nil { + // if there still have difference client node not processing, means the client has property but server don't have + // then queried and sent property to server + r.queryPropertyAndSendToServer(ctx, syncShard, (*notProcessingClientNode).entity, stream) + } + err := stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_NoMorePropertySync{ + NoMorePropertySync: &propertyv1.NoMorePropertySync{}, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send no more property sync request to server") + } + return + } + + // keep reading the tree summary until there are no more different nodes + for _, node := range differTreeSummary.Nodes { + select { + case <-ctx.Done(): + r.scheduler.l.Warn().Msgf("context done while handling differ summary from server") + return + default: + break + } + // if the repair node doesn't exist in the server side, then should send all the real property data to server + if !node.Exists { + clientSlotNode, findError := r.findSlotNodeByRoot(reader, rootNode, node.SlotIndex) + if findError != nil { + r.scheduler.l.Warn().Err(findError).Msgf("client slot %d not exist", node.SlotIndex) + continue + } + // read the leaf nodes from the client side and send properties to the server + r.readingReduceLeafAndSendProperties(ctx, syncShard, stream, bufSlotReader, clientSlotNode) + continue + } + + needsFindSlot := false + if *currentComparingClientNode != nil && (*currentComparingClientNode).slotInx != node.SlotIndex { + // the comparing node has changed, checks the client side still has reduced properties or not + // reading all reduced properties from the client side, and send to the server + r.readingReduceLeafAndSendProperties(ctx, syncShard, stream, bufSlotReader, *currentComparingClientNode) + needsFindSlot = true + } else if *currentComparingClientNode == nil { + needsFindSlot = true + } + + if needsFindSlot { + clientSlotNode, findError := r.findSlotNodeByRoot(reader, rootNode, node.SlotIndex) + // if slot not exists in client side, then the client should ask the server for the property data of leaf nodes + if findError != nil { + r.sendPropertyMissing(stream, node.Entity) + continue + } + *currentComparingClientNode = clientSlotNode + } + + r.readingClientNodeAndCompare(ctx, syncShard, node, bufSlotReader, *currentComparingClientNode, stream, notProcessingClientNode) + } +} + +func (r *repairGossipClient) readingReduceLeafAndSendProperties( + ctx context.Context, + syncShard *shard, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], + reader *repairBufferLeafReader, parent *repairTreeNode, +) { + // read the leaf nodes from the client side + for { + leafNode, err := reader.next(parent) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to read leaf nodes from client side") + break + } + if leafNode == nil { + break + } + // reading the real property data from the leaf nodes and sending to the server + r.queryPropertyAndSendToServer(ctx, syncShard, leafNode.entity, stream) + } +} + +func (r *repairGossipClient) readingClientNodeAndCompare( + ctx context.Context, + syncShard *shard, + serverNode *propertyv1.TreeLeafNode, + bufReader *repairBufferLeafReader, + clientSlotNode *repairTreeNode, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], + notProcessingClientNode **repairTreeNode, +) { + var clientLeafNode *repairTreeNode + // if the latest node is not nil, means the client side has a leaf node that is not processed yet + if *notProcessingClientNode != nil { + clientLeafNode = *notProcessingClientNode + } else { + node, err := bufReader.next(clientSlotNode) + if err != nil { + r.sendPropertyMissing(stream, serverNode.Entity) + return + } + clientLeafNode = node + } + + // if the client current leaf node is nil, means the client side doesn't have the leaf node, + // we should send the property missing request to the server + if clientLeafNode == nil { + r.sendPropertyMissing(stream, serverNode.Entity) + return + } + + // compare the entity of the server leaf node with the client leaf node + entityCompare := strings.Compare(serverNode.Entity, clientLeafNode.entity) + if entityCompare == 0 { + // if the entity is the same, check the sha value + if serverNode.Sha != clientLeafNode.shaValue { + r.queryPropertyAndSendToServer(ctx, syncShard, serverNode.Entity, stream) + } + *notProcessingClientNode = nil + return + } else if entityCompare < 0 { + // if the entity of the server leaf node is less than the client leaf node, + // it means the server leaf node does not exist in the client leaf nodes + r.sendPropertyMissing(stream, serverNode.Entity) + // means the client node is still not processing, waiting for the server node to compare + *notProcessingClientNode = clientLeafNode + return + } + // otherwise, the entity of the server leaf node is greater than the client leaf node, + // it means the client leaf node does not exist in the server leaf nodes, + + // we should query the property from the client side and send it to the server + r.queryPropertyAndSendToServer(ctx, syncShard, clientLeafNode.entity, stream) + // cleanup the unprocess node, and let the client side leaf nodes keep reading + *notProcessingClientNode = nil + // cycle to read the next leaf node from the client side to make sure they have synced to the same entity + r.readingClientNodeAndCompare(ctx, syncShard, serverNode, bufReader, clientSlotNode, stream, notProcessingClientNode) +} + +func (r *repairGossipClient) queryPropertyAndSendToServer( + ctx context.Context, + syncShard *shard, + entity string, + stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, propertyv1.RepairResponse], +) { + // otherwise, we need to send the property to the server + property, p, err := r.queryProperty(ctx, syncShard, entity) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to query property for leaf node entity %s", entity) + return + } + if property == nil { + return + } + // send the property to the server + err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: GetPropertyID(p), + Property: p, + DeleteTime: property.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync request to server, entity: %s", entity) + } +} + +func (r *repairGossipClient) findSlotNodeByRoot(reader repairTreeReader, root *repairTreeNode, index int32) (*repairTreeNode, error) { + firstRead := true + for { + slotNodes, err := reader.read(root, gossipMerkleTreeReadPageSize, firstRead) + if err != nil { + return nil, err + } + if len(slotNodes) == 0 { + return nil, fmt.Errorf("failed to find slot node by root: %d", index) + } + for _, s := range slotNodes { + if s.slotInx == index { + return s, nil + } + } + + firstRead = false + } +} + +type repairGossipServer struct { + propertyv1.UnimplementedRepairServiceServer + repairGossipBase +} + +func newRepairGossipServer(s *repairScheduler) *repairGossipServer { + return &repairGossipServer{ + repairGossipBase: repairGossipBase{ + scheduler: s, + }, + } +} + +func (r *repairGossipServer) Repair(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse]) error { + summary, reader, err := r.combineTreeSummary(s) + if err != nil { + return fmt.Errorf("failed to receive tree summary request: %w", err) + } + defer reader.close() + // if no need to compare the tree, we can skip the rest of the process + if summary.ignoreCompare { + r.scheduler.l.Debug().Msgf("tree root for group %s, shard %d is the same, skip tree slots", summary.group, summary.shardID) + return nil + } + group := summary.group + shardID := summary.shardID + var hasPropertyUpdated bool + defer func() { + if hasPropertyUpdated { + err = r.scheduler.buildingTree([]common.ShardID{common.ShardID(shardID)}, group, true) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to build tree for group %s", group) + } + } + }() + + serverSlotNodes, err := reader.read(summary.rootNode, int64(r.scheduler.treeSlotCount), false) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to read slot nodes on server side") + return r.sendEmptyDiffer(s) + } + // client missing slots or server slots with different SHA values + clientMismatchSlots := make([]*repairTreeNode, 0) + // server missing slots + serverMissingSlots := make([]int32, 0) + clientSlotMap := make(map[int32]string, len(summary.slots)) + for _, clientSlot := range summary.slots { + clientSlotMap[clientSlot.index] = clientSlot.shaValue + } + serverSlotSet := make(map[int32]bool, len(serverSlotNodes)) + for _, serverSlot := range serverSlotNodes { + serverSlotSet[serverSlot.slotInx] = true + // if the client slot exists but the SHA value is different, or client slot does not exist + // then we should add it to the client mismatch slots + if clientSha, ok := clientSlotMap[serverSlot.slotInx]; ok && clientSha != serverSlot.shaValue { + clientMismatchSlots = append(clientMismatchSlots, serverSlot) + } else if !ok { + clientMismatchSlots = append(clientMismatchSlots, serverSlot) + } + } + // if the client slot exists but the server slot does not exist, we should add it to the server missing slots + for _, clientSlot := range summary.slots { + if _, ok := serverSlotSet[clientSlot.index]; !ok { + serverMissingSlots = append(serverMissingSlots, clientSlot.index) + } + } + sent, err := r.sendDifferSlots(reader, clientMismatchSlots, serverMissingSlots, s) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send different slots to client") + } + // send the tree and no more different slots needs to be sent + err = r.sendEmptyDiffer(s) + if !sent { + return err + } else if err != nil { + // should keep the message receiving loop + r.scheduler.l.Warn().Msgf("sent no difference slot to client failure, error: %v", err) + } + for { + missingOrSyncRequest, err := s.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("failed to receive missing or sync request: %w", err) + } + syncShard, err := r.scheduler.db.loadShard(s.Context(), common.ShardID(shardID)) + if err != nil { + return fmt.Errorf("shard %d load failure on server side: %w", shardID, err) + } + switch req := missingOrSyncRequest.Data.(type) { + case *propertyv1.RepairRequest_PropertyMissing: + r.processPropertyMissing(s.Context(), syncShard, req.PropertyMissing, s) + case *propertyv1.RepairRequest_PropertySync: + if r.processPropertySync(s.Context(), syncShard, req.PropertySync, s, group) { + hasPropertyUpdated = true + } + case *propertyv1.RepairRequest_NoMorePropertySync: + // if the client has no more property sync, the server side should stop the sync + return nil + } + } +} + +func (r *repairGossipServer) combineTreeSummary( + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], +) (*repairTreeSummary, repairTreeReader, error) { + var summary *repairTreeSummary + var reader repairTreeReader + for { + recvData, err := s.Recv() + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to receive tree summary from client") + return nil, nil, err + } + + switch data := recvData.Data.(type) { + case *propertyv1.RepairRequest_TreeRoot: + summary, reader, err = r.handleTreeRootRequest(s, data) + if err != nil { + if sendError := r.sendRootCompare(s, false, false); sendError != nil { + r.scheduler.l.Warn().Err(sendError).Msgf("failed to send root compare response to client") + } + return nil, nil, err + } + if err = r.sendRootCompare(s, true, summary.ignoreCompare); err != nil { + _ = reader.close() + return nil, nil, err + } + if summary.ignoreCompare { + return summary, reader, nil + } + case *propertyv1.RepairRequest_TreeSlots: + if len(data.TreeSlots.SlotSha) == 0 { + return summary, reader, nil + } + for _, slot := range data.TreeSlots.SlotSha { + summary.slots = append(summary.slots, &repairTreeSummarySlot{ + index: slot.Slot, + shaValue: slot.Value, + }) + } + default: + r.scheduler.l.Warn().Msgf("unexpected data type: %T, expected TreeRoot or TreeSlots", data) + } + } +} + +func (r *repairGossipServer) handleTreeRootRequest( + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], + req *propertyv1.RepairRequest_TreeRoot, +) (*repairTreeSummary, repairTreeReader, error) { + summary := &repairTreeSummary{ + group: req.TreeRoot.Group, + shardID: req.TreeRoot.ShardId, + } + + reader, exist, err := r.getTreeReader(s.Context(), summary.group, summary.shardID) + if err != nil || !exist { + return nil, nil, fmt.Errorf("tree not found or not exist: %w", err) + } + rootNode, err := reader.read(nil, 1, false) + if err != nil { + _ = reader.close() + return nil, nil, fmt.Errorf("failed to read tree root for group %s: %w", summary.group, err) + } + if len(rootNode) == 0 { + _ = reader.close() + return nil, nil, fmt.Errorf("failed to read tree root for group %s: %w", summary.group, err) + } + summary.rootNode = rootNode[0] + summary.ignoreCompare = req.TreeRoot.RootSha == rootNode[0].shaValue + return summary, reader, nil +} + +func (r *repairGossipServer) sendRootCompare(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], treeFound, rootMatch bool) error { + return s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_RootCompare{ + RootCompare: &propertyv1.RootCompare{ + TreeFound: treeFound, + RootShaMatch: rootMatch, + }, + }, + }) +} + +func (r *repairGossipServer) processPropertySync( + ctx context.Context, + syncShard *shard, + sync *propertyv1.PropertySync, + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], + group string, +) bool { + updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s from server side", sync.Id) + r.scheduler.metrics.totalRepairFailedCount.Inc(1, group, fmt.Sprintf("%d", syncShard.id)) + return false + } + if updated { + r.scheduler.l.Debug().Msgf("successfully repaired property %s on server side", sync.Id) + r.scheduler.metrics.totalRepairSuccessCount.Inc(1, group, fmt.Sprintf("%d", syncShard.id)) + } + if !updated && newer != nil { + // if the property hasn't been updated, and the newer property is not nil, + // which means the property is newer than the client side, + var p propertyv1.Property + err = protojson.Unmarshal(newer.source, &p) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal property from db by entity %s", newer.id) + return false + } + // send the newer property to the client + err = s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: newer.id, + Property: &p, + DeleteTime: newer.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to client, entity: %s", newer.id) + return false + } + } + return updated +} + +func (r *repairGossipServer) processPropertyMissing( + ctx context.Context, + syncShard *shard, + missing *propertyv1.PropertyMissing, + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], +) { + property, data, err := r.queryProperty(ctx, syncShard, missing.Entity) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to query client missing property from server side: %s", missing.Entity) + return + } + if property == nil { + return + } + err = s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_PropertySync{ + PropertySync: &propertyv1.PropertySync{ + Id: property.id, + Property: data, + DeleteTime: property.deleteTime, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync response to client, entity: %s", missing.Entity) + return + } +} + +func (r *repairGossipServer) sendDifferSlots( + reader repairTreeReader, + clientMismatchSlots []*repairTreeNode, + serverMissingSlots []int32, + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], +) (hasSent bool, err error) { + var leafNodes []*repairTreeNode + + // send server mismatch slots to the client + for _, node := range clientMismatchSlots { + for { + leafNodes, err = reader.read(node, gossipMerkleTreeReadPageSize, false) + if err != nil { + return hasSent, fmt.Errorf("failed to read leaf nodes for slot %d: %w", node.slotInx, err) + } + // if there are no more leaf nodes, we can skip this slot + if len(leafNodes) == 0 { + break + } + mismatchLeafNodes := make([]*propertyv1.TreeLeafNode, 0, len(leafNodes)) + for _, leafNode := range leafNodes { + mismatchLeafNodes = append(mismatchLeafNodes, &propertyv1.TreeLeafNode{ + SlotIndex: node.slotInx, + Exists: true, + Entity: leafNode.entity, + Sha: leafNode.shaValue, + }) + } + // send the leaf nodes to the client + err = s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_DifferTreeSummary{ + DifferTreeSummary: &propertyv1.DifferTreeSummary{ + Nodes: mismatchLeafNodes, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err). + Msgf("failed to send leaf nodes for slot %d", node.slotInx) + } else { + hasSent = true + } + } + } + + // send server missing slots to the client + missingSlots := make([]*propertyv1.TreeLeafNode, 0, len(serverMissingSlots)) + for _, missingSlot := range serverMissingSlots { + missingSlots = append(missingSlots, &propertyv1.TreeLeafNode{ + SlotIndex: missingSlot, + Exists: false, + }) + } + if len(missingSlots) > 0 { + // send the missing slots to the client + err = s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_DifferTreeSummary{ + DifferTreeSummary: &propertyv1.DifferTreeSummary{ + Nodes: missingSlots, + }, + }, + }) + if err != nil { + r.scheduler.l.Warn().Err(err). + Msgf("failed to send missing slots") + } else { + hasSent = true + } + } + return hasSent, nil +} + +func (r *repairGossipServer) sendEmptyDiffer(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse]) error { + return s.Send(&propertyv1.RepairResponse{ + Data: &propertyv1.RepairResponse_DifferTreeSummary{ + DifferTreeSummary: &propertyv1.DifferTreeSummary{}, + }, + }) +} + +type repairTreeSummary struct { + rootNode *repairTreeNode + group string + slots []*repairTreeSummarySlot + shardID uint32 + ignoreCompare bool +} + +type repairTreeSummarySlot struct { + shaValue string + index int32 +} + +type emptyRepairTreeReader struct{} + +func (e *emptyRepairTreeReader) read(n *repairTreeNode, _ int64, _ bool) ([]*repairTreeNode, error) { + if n == nil { + return []*repairTreeNode{{tp: repairTreeNodeTypeRoot}}, nil + } + return nil, nil +} + +func (e *emptyRepairTreeReader) close() error { + return nil +} diff --git a/banyand/property/repair_gossip_test.go b/banyand/property/repair_gossip_test.go new file mode 100644 index 00000000..727d85fb --- /dev/null +++ b/banyand/property/repair_gossip_test.go @@ -0,0 +1,581 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "net" + "path" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "go.uber.org/mock/gomock" + "go.uber.org/multierr" + "google.golang.org/grpc" + + "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/property/gossip" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +const ( + testGroup1 = "test-group1" +) + +func TestPropertyRepairGossip(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Property Repair Gossip Suite") +} + +var _ = ginkgo.Describe("Property repair gossip", func() { + gomega.Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(gomega.Succeed()) + + var ctrl *gomock.Controller + var nodes []*nodeContext + ginkgo.BeforeEach(func() { + ctrl = gomock.NewController(ginkgo.GinkgoT()) + gomega.Expect(ctrl).NotTo(gomega.BeNil(), "gomock controller should not be nil") + }) + + ginkgo.AfterEach(func() { + for _, node := range nodes { + if node != nil { + node.stopAll() + } + } + nodes = nil + }) + + ginkgo.It("all repair tree not built", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // all nodes should keep their existing properties + return original + }, + }) + }) + + ginkgo.It("all repair tree with client built", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // all nodes should keep their existing properties + return original + }, + }) + }) + + ginkgo.It("gossip two data nodes with client version < server version", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // the first node property should be updated to the version 2 + original[0].properties[0].version = 2 + return original + }, + }) + }) + + ginkgo.It("gossip two data nodes with client version < server version", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // the first node property should be updated to version 2 + original[1].properties[0].version = 2 + return original + }, + }) + }) + + ginkgo.It("gossip two data nodes with client version = server version", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // keep the properties as is, since the versions are equal + return original + }, + }) + }) + + ginkgo.It("gossip two data nodes with client missing but server exist", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: nil, shardCount: 2, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, shardCount: 2, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // the first node should get the property from the second node + original[0].properties = original[1].properties + return original + }, + }) + }) + + ginkgo.It("gossip two data nodes with client exist but server missing", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, shardCount: 2, treeBuilt: true}, + {properties: nil, shardCount: 2, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // the second node should get the property from the first node + original[1].properties = original[0].properties + return original + }, + }) + }) + + ginkgo.It("gossip only repair one shard", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, shardCount: 2, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 1, id: "2", version: 2}}, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, testGroup1, 1) + }, + result: func(original []node) []node { + // the first node should sync the property from the second node + original[0].properties = append(original[0].properties, original[1].properties[0]) + return original + }, + }) + }) + + ginkgo.It("gossip with three nodes", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 1}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 2}}, treeBuilt: true}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 3}}, treeBuilt: true}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID, nodes[2].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + original[0].properties[0] = original[2].properties[0] + original[1].properties[0] = original[2].properties[0] + return original + }, + }) + }) + + ginkgo.It("gossip with three nodes and only one slot", func() { + startingEachTest(&nodes, ctrl, &testCase{ + groups: []group{ + {name: testGroup1, shardCount: 2, replicasCount: 3}, + }, + nodes: []node{ + {properties: []property{{group: testGroup1, shard: 0, id: "2", version: 1}}, treeBuilt: true, treeSlotCount: 1}, + {properties: []property{{group: testGroup1, shard: 0, id: "1", version: 1}}, treeBuilt: true, treeSlotCount: 1}, + {properties: []property{{group: testGroup1, shard: 0, id: "3", version: 1}}, treeBuilt: true, treeSlotCount: 1}, + }, + propagation: func(nodes []*nodeContext) error { + return nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID, nodes[2].nodeID}, testGroup1, 0) + }, + result: func(original []node) []node { + // should all nodes have all properties + original[0].properties = append(original[0].properties, original[1].properties[0]) + original[0].properties = append(original[0].properties, original[2].properties[0]) + + original[1].properties = append(original[1].properties, original[0].properties[0]) + original[1].properties = append(original[1].properties, original[2].properties[0]) + + original[2].properties = append(original[2].properties, original[0].properties[0]) + original[2].properties = append(original[2].properties, original[1].properties[0]) + return original + }, + }) + }) +}) + +type testCase struct { + propagation func(nodes []*nodeContext) error + result func(original []node) []node + groups []group + nodes []node +} + +func startingEachTest(nodes *[]*nodeContext, ctrl *gomock.Controller, c *testCase) { + *nodes = startDataNodes(ctrl, c.nodes, c.groups) + + // adding the wait group the node context, to make sure the gossip server is synced at least once + once := sync.Once{} + leastOnceChannel := make(chan struct{}) + for _, n := range *nodes { + n.clientWrapper.once = &once + n.clientWrapper.c = leastOnceChannel + } + err := c.propagation(*nodes) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // make sure the gossip server is synced at least once + gomega.Eventually(leastOnceChannel, flags.EventuallyTimeout).Should(gomega.BeClosed()) + + original := nodeContextToParentSlice(*nodes) + updatedResult := c.result(original) + for inx, r := range updatedResult { + relatedCtx := (*nodes)[inx] + for _, updatedProperty := range r.properties { + queryPropertyWithVerify(relatedCtx.database, updatedProperty) + } + } +} + +func startDataNodes(ctrl *gomock.Controller, nodes []node, groups []group) []*nodeContext { + result := make([]*nodeContext, 0, len(nodes)) + for _, n := range nodes { + result = append(result, startEachNode(ctrl, n, groups)) + } + + // registering the node in the gossip system + for _, m := range result { + for _, n := range result { + m.messenger.(schema.EventHandler).OnAddOrUpdate(schema.Metadata{ + TypeMeta: schema.TypeMeta{ + Name: n.nodeID, + Kind: schema.KindNode, + }, + Spec: &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: n.nodeID, + }, + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + GrpcAddress: n.nodeID, + + PropertyRepairGossipGrpcAddress: n.nodeID, + }, + }) + } + } + return result +} + +func startEachNode(ctrl *gomock.Controller, node node, groups []group) *nodeContext { + if node.treeSlotCount == 0 { + node.treeSlotCount = 32 // default value for tree slot count + } + result := &nodeContext{node: node} + dbLocation, dbLocationDefer, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + result.appendStop(dbLocationDefer) + repairLocation, repairLocationDefer, err := test.NewSpace() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + result.appendStop(repairLocationDefer) + mockGroup := schema.NewMockGroup(ctrl) + groupDefines := make([]*commonv1.Group, 0, len(groups)) + for _, g := range groups { + groupDefines = append(groupDefines, &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Group: g.name, + Name: g.name, + }, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: uint32(g.shardCount), + Replicas: uint32(g.replicasCount), + }, + }) + } + mockGroup.EXPECT().ListGroup(gomock.Any()).Return(groupDefines, nil).AnyTimes() + + mockRepo := metadata.NewMockRepo(ctrl) + mockRepo.EXPECT().RegisterHandler("", schema.KindGroup, gomock.Any()).MaxTimes(1) + mockRepo.EXPECT().GroupRegistry().Return(mockGroup).AnyTimes() + + ports, err := test.AllocateFreePorts(1) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + messenger := gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0]) + addr := fmt.Sprintf("127.0.0.1:%d", ports[0]) + result.nodeID = addr + err = messenger.Validate() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(), common.ContextNodeKey, common.Node{ + NodeID: addr, + PropertyGossipGrpcAddress: addr, + })) + + ctx := context.WithValue(context.Background(), common.ContextNodeKey, common.Node{ + NodeID: addr, + }) + var db *database + db, err = openDB(ctx, + dbLocation, time.Minute*10, time.Minute*10, int(node.treeSlotCount), observability.NewBypassRegistry(), + fs.NewLocalFileSystem(), true, repairLocation, "@every 10m", time.Minute*10, "* 2 * * *", + messenger, mockRepo, func(context.Context) (string, error) { + snapshotDir, defFunc, newSpaceErr := test.NewSpace() + if newSpaceErr != nil { + return "", newSpaceErr + } + result.appendStop(defFunc) + sList := db.sLst.Load() + var snpError error + for _, s := range *sList { + snpDir := path.Join(snapshotDir, filepath.Base(s.location)) + lfs.MkdirPanicIfExist(snpDir, storage.DirPerm) + if e := s.store.TakeFileSnapshot(snpDir); e != nil { + snpError = multierr.Append(snpError, e) + } + } + return snapshotDir, snpError + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + result.database = db + + // wrap the server and client in gossip messenger to for getting the sync status + messenger.RegisterServices(result.database.repairScheduler.registerServerToGossip()) + gossipClient := &repairGossipClientWrapper{repairGossipClient: newRepairGossipClient(result.database.repairScheduler)} + result.clientWrapper = gossipClient + messenger.Subscribe(gossipClient) + db.repairScheduler.registerClientToGossip(messenger) + + messenger.Serve(make(chan struct{})) + result.messenger = messenger + + // check gossip server is up + gomega.Eventually(func() error { + conn, connectErr := net.DialTimeout("tcp", addr, time.Second*2) + if connectErr == nil { + _ = conn.Close() + } + return connectErr + }, flags.EventuallyTimeout).Should(gomega.Succeed()) + + result.appendStop(messenger.GracefulStop) + + // initialize shard in to db + for i := int32(0); i < node.shardCount; i++ { + shardID := common.ShardID(i) + _, err = db.loadShard(context.Background(), shardID) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + // adding data to the node + for _, p := range node.properties { + applyPropertyUpdate(db, p) + } + + if node.treeBuilt { + // building the gossip tree for the node + err = db.repairScheduler.buildingTree(nil, "", true) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + return result +} + +type repairGossipClientWrapper struct { + *repairGossipClient + c chan struct{} + once *sync.Once +} + +func (w *repairGossipClientWrapper) Rev(ctx context.Context, nextNode *grpc.ClientConn, request *propertyv1.PropagationRequest) error { + err := w.repairGossipClient.Rev(ctx, nextNode, request) + w.once.Do(func() { + close(w.c) + }) + return err +} + +func applyPropertyUpdate(db *database, p property) { + s, err := db.loadShard(context.Background(), common.ShardID(p.shard)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + update := &propertyv1.Property{ + Metadata: &commonv1.Metadata{ + Group: p.group, + Name: "test-name", + ModRevision: p.version, + }, + Id: p.id, + } + if p.deleted { + err = s.delete(context.Background(), [][]byte{GetPropertyID(update)}) + } else { + err = s.update(GetPropertyID(update), update) + } + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + +func queryPropertyWithVerify(db *database, p property) { + s, err := db.loadShard(context.Background(), common.ShardID(p.shard)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{p.group}, + Name: "test-name", + Ids: []string{p.id}, + }, groupField, entityID) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + gomega.Eventually(func() *property { + dataList, err := s.search(context.Background(), query, 10) + if err != nil { + return nil + } + var latestData *queryProperty + for _, data := range dataList { + if latestData == nil || data.timestamp > latestData.timestamp { + latestData = data + } + } + if latestData == nil { + return nil + } + return &property{group: p.group, shard: p.shard, id: p.id, version: latestData.timestamp, deleted: latestData.deleteTime > 0} + }, flags.EventuallyTimeout).Should(gomega.Equal(&p)) +} + +type group struct { + name string + shardCount int32 + replicasCount int32 +} + +type node struct { + properties []property + treeBuilt bool + // for the no data scenario, we need to specify the shard count + shardCount int32 + treeSlotCount int32 +} + +type property struct { + group string + id string + version int64 + shard int32 + deleted bool +} + +type nodeContext struct { + messenger gossip.Messenger + database *database + clientWrapper *repairGossipClientWrapper + nodeID string + stop []func() + node + stopMutex sync.RWMutex +} + +func (n *nodeContext) appendStop(f func()) { + n.stopMutex.Lock() + defer n.stopMutex.Unlock() + n.stop = append(n.stop, f) +} + +func (n *nodeContext) stopAll() { + n.stopMutex.RLock() + result := make([]func(), 0, len(n.stop)) + result = append(result, n.stop...) + n.stopMutex.RUnlock() + for _, f := range result { + f() + } +} + +func nodeContextToParentSlice(ncs []*nodeContext) []node { + nodes := make([]node, 0, len(ncs)) + for _, nc := range ncs { + nodes = append(nodes, nc.node) + } + return nodes +} diff --git a/banyand/property/repair_test.go b/banyand/property/repair_test.go index 172cd351..bc795b57 100644 --- a/banyand/property/repair_test.go +++ b/banyand/property/repair_test.go @@ -199,8 +199,8 @@ func TestBuildTree(t *testing.T) { } defers = append(defers, snapshotDeferFunc) db, err := openDB(context.Background(), dataDir, 3*time.Second, time.Hour, 32, - observability.BypassRegistry, fs.NewLocalFileSystem(), snapshotDir, - "@every 10m", time.Second*10, func(context.Context) (string, error) { + observability.BypassRegistry, fs.NewLocalFileSystem(), true, snapshotDir, + "@every 10m", time.Second*10, "* 2 * * *", nil, nil, func(context.Context) (string, error) { snapshotDir, defFunc, newSpaceErr := test.NewSpace() if newSpaceErr != nil { return "", newSpaceErr @@ -231,7 +231,7 @@ func TestBuildTree(t *testing.T) { } } - err = newShard.repairState.scheduler.doRepair() + err = newShard.repairState.scheduler.doBuildTree() if err != nil { t.Fatalf("failed to build status: %v", err) } @@ -254,7 +254,7 @@ func TestBuildTree(t *testing.T) { } if tt.nextStatusVerify != nil { - err := newShard.repairState.scheduler.doRepair() + err := newShard.repairState.scheduler.doBuildTree() if err != nil { t.Fatalf("failed to build status after update: %v", err) } @@ -289,8 +289,8 @@ func TestDocumentUpdatesNotify(t *testing.T) { } defers = append(defers, snapshotDeferFunc) db, err := openDB(context.Background(), dataDir, 3*time.Second, time.Hour, 32, - observability.BypassRegistry, fs.NewLocalFileSystem(), snapshotDir, - "@every 10m", time.Millisecond*50, func(context.Context) (string, error) { + observability.BypassRegistry, fs.NewLocalFileSystem(), true, snapshotDir, + "@every 10m", time.Millisecond*50, "* 2 * * *", nil, nil, func(context.Context) (string, error) { tmpDir, defFunc, newSpaceErr := test.NewSpace() if newSpaceErr != nil { return "", newSpaceErr @@ -523,7 +523,7 @@ func (r *repairData) readTree(t *testing.T, group string) *repairTestTree { _ = reader.close() }() - roots, err := reader.read(nil, 10) + roots, err := reader.read(nil, 10, false) if err != nil { t.Fatalf("failed to read tree for group %s: %v", group, err) } @@ -536,7 +536,7 @@ func (r *repairData) readTree(t *testing.T, group string) *repairTestTree { shaValue: roots[0].shaValue, }, } - slots, err := reader.read(roots[0], 10) + slots, err := reader.read(roots[0], 10, false) if err != nil { t.Fatalf("failed to read slots for group %s: %v", group, err) } @@ -545,20 +545,20 @@ func (r *repairData) readTree(t *testing.T, group string) *repairTestTree { } for _, slot := range slots { slotNode := &repairTestTreeNode{ - id: slot.id, + id: fmt.Sprintf("%d", slot.slotInx), shaValue: slot.shaValue, } tree.root.children = append(tree.root.children, slotNode) - children, err := reader.read(slot, 10) + children, err := reader.read(slot, 10, false) if err != nil { - t.Fatalf("failed to read children for slot %s in group %s: %v", slot.id, group, err) + t.Fatalf("failed to read children for slot %d in group %s: %v", slot.slotInx, group, err) } if len(children) == 0 { - t.Fatalf("expected at least one child for slot %s in group %s, but got none", slot.id, group) + t.Fatalf("expected at least one child for slot %d in group %s, but got none", slot.slotInx, group) } for _, child := range children { childNode := &repairTestTreeNode{ - id: child.id, + id: child.entity, shaValue: child.shaValue, } slotNode.children = append(slotNode.children, childNode) diff --git a/banyand/property/service.go b/banyand/property/service.go index 2c99a523..bc5892e8 100644 --- a/banyand/property/service.go +++ b/banyand/property/service.go @@ -59,21 +59,23 @@ type service struct { gossipMessenger gossip.Messenger omr observability.MetricsRegistry lfs fs.FileSystem + pm protector.Memory close chan struct{} db *database l *logger.Logger - pm protector.Memory - root string nodeID string + root string snapshotDir string repairDir string repairBuildTreeCron string + repairTriggerCron string flushTimeout time.Duration expireTimeout time.Duration repairQuickBuildTreeTime time.Duration repairTreeSlotCount int maxDiskUsagePercent int maxFileSnapshotNum int + repairEnabled bool } func (s *service) FlagSet() *run.FlagSet { @@ -87,6 +89,8 @@ func (s *service) FlagSet() *run.FlagSet { flagS.StringVar(&s.repairBuildTreeCron, "property-repair-build-tree-cron", "@every 1h", "the cron expression for repairing the build tree") flagS.DurationVar(&s.repairQuickBuildTreeTime, "property-repair-quick-build-tree-time", time.Minute*10, "the duration of the quick build tree after operate the property") + flagS.StringVar(&s.repairTriggerCron, "property-repair-trigger-cron", "* 2 * * *", "the cron expression for background repairing the property data") + flagS.BoolVar(&s.repairEnabled, "property-repair-enabled", false, "whether to enable the background property repair") s.gossipMessenger.FlagSet().VisitAll(func(f *pflag.Flag) { flagS.AddFlag(f) }) @@ -134,31 +138,34 @@ func (s *service) PreRun(ctx context.Context) error { } node := val.(common.Node) s.nodeID = node.NodeID - // if the gossip address is empty, it means that the gossip is not enabled. - if node.PropertyGossipGrpcAddress == "" { - s.gossipMessenger = nil - } var err error snapshotLis := &snapshotListener{s: s} s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), s.flushTimeout, s.expireTimeout, s.repairTreeSlotCount, s.omr, s.lfs, - s.repairDir, s.repairBuildTreeCron, s.repairQuickBuildTreeTime, func(ctx context.Context) (string, error) { + s.repairEnabled, s.repairDir, s.repairBuildTreeCron, s.repairQuickBuildTreeTime, s.repairTriggerCron, s.gossipMessenger, s.metadata, + func(ctx context.Context) (string, error) { res := snapshotLis.Rev(ctx, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), []*databasev1.SnapshotRequest_Group{})) snpMsg := res.Data().(*databasev1.Snapshot) if snpMsg.Error != "" { return "", errors.New(snpMsg.Error) } - return snpMsg.Name, nil + return filepath.Join(snapshotLis.s.snapshotDir, snpMsg.Name, storage.DataDir), nil }) if err != nil { return err } + // if the gossip address is empty or repair scheduler is not start, it means that the gossip is not enabled. + if node.PropertyGossipGrpcAddress == "" || s.db.repairScheduler == nil { + s.gossipMessenger = nil + } if s.gossipMessenger != nil { if err = s.gossipMessenger.PreRun(ctx); err != nil { return err } + s.gossipMessenger.RegisterServices(s.db.repairScheduler.registerServerToGossip()) + s.db.repairScheduler.registerClientToGossip(s.gossipMessenger) } return multierr.Combine( s.pipeline.Subscribe(data.TopicPropertyUpdate, &updateListener{s: s, path: path, maxDiskUsagePercent: s.maxDiskUsagePercent}), diff --git a/banyand/property/shard.go b/banyand/property/shard.go index ab9059fa..9d5895fe 100644 --- a/banyand/property/shard.go +++ b/banyand/property/shard.go @@ -241,7 +241,9 @@ func (s *shard) updateDocuments(docs index.Documents) error { if persistentError != nil { return fmt.Errorf("persistent failure: %w", persistentError) } - s.repairState.scheduler.documentUpdatesNotify() + if s.repairState.scheduler != nil { + s.repairState.scheduler.documentUpdatesNotify() + } return nil } @@ -286,18 +288,18 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, return data, nil } -func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) error { +func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) (updated bool, selfNewer *queryProperty, err error) { iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ Groups: []string{property.Metadata.Group}, Name: property.Metadata.Name, Ids: []string{property.Id}, }, groupField, entityID) if err != nil { - return fmt.Errorf("build property query failure: %w", err) + return false, nil, fmt.Errorf("build property query failure: %w", err) } olderProperties, err := s.search(ctx, iq, 100) if err != nil { - return fmt.Errorf("query older properties failed: %w", err) + return false, nil, fmt.Errorf("query older properties failed: %w", err) } sort.Sort(queryPropertySlice(olderProperties)) // if there no older properties, we can insert the latest document. @@ -305,35 +307,41 @@ func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Prop var doc *index.Document doc, err = s.buildUpdateDocument(id, property, deleteTime) if err != nil { - return fmt.Errorf("build update document failed: %w", err) + return false, nil, fmt.Errorf("build update document failed: %w", err) + } + err = s.updateDocuments(index.Documents{*doc}) + if err != nil { + return false, nil, fmt.Errorf("update document failed: %w", err) } - return s.updateDocuments(index.Documents{*doc}) + return true, nil, nil } // if the lastest property in shard is bigger than the repaired property, // then the repaired process should be stopped. - if olderProperties[len(olderProperties)-1].timestamp > property.Metadata.ModRevision { - return nil + if (olderProperties[len(olderProperties)-1].timestamp > property.Metadata.ModRevision) || + olderProperties[len(olderProperties)-1].timestamp == property.Metadata.ModRevision && + olderProperties[len(olderProperties)-1].deleteTime == deleteTime { + return false, olderProperties[len(olderProperties)-1], nil } docIDList := s.buildNotDeletedDocIDList(olderProperties) deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano()) if err != nil { - return fmt.Errorf("build delete older documents failed: %w", err) + return false, nil, fmt.Errorf("build delete older documents failed: %w", err) } // update the property to mark it as delete updateDoc, err := s.buildUpdateDocument(id, property, deleteTime) if err != nil { - return fmt.Errorf("build repair document failure: %w", err) + return false, nil, fmt.Errorf("build repair document failure: %w", err) } result := make([]index.Document, 0, len(deletedDocuments)+1) result = append(result, deletedDocuments...) result = append(result, *updateDoc) err = s.updateDocuments(result) if err != nil { - return fmt.Errorf("update documents failed: %w", err) + return false, nil, fmt.Errorf("update documents failed: %w", err) } - return nil + return true, nil, nil } func (s *shard) buildNotDeletedDocIDList(properties []*queryProperty) [][]byte { diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go index 4b986b36..0a331cb4 100644 --- a/banyand/property/shard_test.go +++ b/banyand/property/shard_test.go @@ -102,7 +102,7 @@ func TestMergeDeleted(t *testing.T) { } defers = append(defers, snapshotDeferFunc) db, err := openDB(context.Background(), dataDir, 3*time.Second, tt.expireDeletionTime, 32, observability.BypassRegistry, fs.NewLocalFileSystem(), - snapshotDir, "@every 10m", time.Second*10, nil) + true, snapshotDir, "@every 10m", time.Second*10, "* 2 * * *", nil, nil, nil) if err != nil { t.Fatal(err) } @@ -161,13 +161,13 @@ func TestRepair(t *testing.T) { now := time.Now() tests := []struct { beforeApply func() []*propertyv1.Property - repair func(ctx context.Context, s *shard) error + repair func(ctx context.Context, s *shard) (bool, *queryProperty, error) verify func(t *testing.T, ctx context.Context, s *shard) error name string }{ { name: "repair normal with no properties", - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { property := generateProperty(fmt.Sprintf("test-id%d", 0), now.UnixNano(), 0) return s.repair(ctx, GetPropertyID(property), property, 0) }, @@ -182,7 +182,7 @@ func TestRepair(t *testing.T) { }, { name: "repair deleted with no properties", - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { property := generateProperty(fmt.Sprintf("test-id%d", 0), now.UnixNano(), 0) return s.repair(ctx, GetPropertyID(property), property, now.UnixNano()) }, @@ -203,7 +203,7 @@ func TestRepair(t *testing.T) { } return }, - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { property := generateProperty("test-id3", now.UnixNano(), 1000) return s.repair(ctx, GetPropertyID(property), property, 0) }, @@ -225,7 +225,7 @@ func TestRepair(t *testing.T) { generateProperty("test-id", now.UnixNano()-100, 0), } }, - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { property := generateProperty("test-id", now.UnixNano(), 1000) return s.repair(ctx, GetPropertyID(property), property, now.UnixNano()) }, @@ -248,7 +248,7 @@ func TestRepair(t *testing.T) { } return res }, - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { // Create a property with an older mod revision property := generateProperty("test-id", now.UnixNano()-1000, 2000) return s.repair(ctx, GetPropertyID(property), property, 0) @@ -273,7 +273,7 @@ func TestRepair(t *testing.T) { generateProperty("test-id", now.UnixNano(), 0), } }, - repair: func(ctx context.Context, s *shard) error { + repair: func(ctx context.Context, s *shard) (bool, *queryProperty, error) { // Create a property with the same data but marked as deleted property := generateProperty("test-id", now.UnixNano(), 0) return s.repair(ctx, GetPropertyID(property), property, now.UnixNano()) @@ -311,7 +311,7 @@ func TestRepair(t *testing.T) { } defers = append(defers, snapshotDeferFunc) db, err := openDB(context.Background(), dataDir, 3*time.Second, 1*time.Hour, 32, observability.BypassRegistry, fs.NewLocalFileSystem(), - snapshotDir, "@every 10m", time.Second*10, nil) + true, snapshotDir, "@every 10m", time.Second*10, "* 2 * * *", nil, nil, nil) if err != nil { t.Fatal(err) } @@ -333,7 +333,7 @@ func TestRepair(t *testing.T) { } } - if err = tt.repair(context.Background(), newShard); err != nil { + if _, _, err = tt.repair(context.Background(), newShard); err != nil { t.Fatal(err) } diff --git a/docs/api-reference.md b/docs/api-reference.md index 38b3dfb0..dd088e13 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -255,6 +255,21 @@ - [banyandb/property/v1/property.proto](#banyandb_property_v1_property-proto) - [Property](#banyandb-property-v1-Property) +- [banyandb/property/v1/repair.proto](#banyandb_property_v1_repair-proto) + - [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) + - [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) + - [PropertyMissing](#banyandb-property-v1-PropertyMissing) + - [PropertySync](#banyandb-property-v1-PropertySync) + - [RepairRequest](#banyandb-property-v1-RepairRequest) + - [RepairResponse](#banyandb-property-v1-RepairResponse) + - [RootCompare](#banyandb-property-v1-RootCompare) + - [TreeLeafNode](#banyandb-property-v1-TreeLeafNode) + - [TreeRoot](#banyandb-property-v1-TreeRoot) + - [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA) + - [TreeSlots](#banyandb-property-v1-TreeSlots) + + - [RepairService](#banyandb-property-v1-RepairService) + - [banyandb/property/v1/rpc.proto](#banyandb_property_v1_rpc-proto) - [ApplyRequest](#banyandb-property-v1-ApplyRequest) - [ApplyResponse](#banyandb-property-v1-ApplyResponse) @@ -3771,6 +3786,7 @@ WriteResponse is the response contract for write | ----- | ---- | ----- | ----------- | | context | [PropagationContext](#banyandb-property-v1-PropagationContext) | | | | group | [string](#string) | | | +| shard_id | [uint32](#uint32) | | | @@ -3840,6 +3856,207 @@ Property stores the user defined data +<a name="banyandb_property_v1_repair-proto"></a> +<p align="right"><a href="#top">Top</a></p> + +## banyandb/property/v1/repair.proto + + + +<a name="banyandb-property-v1-DifferTreeSummary"></a> + +### DifferTreeSummary + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| nodes | [TreeLeafNode](#banyandb-property-v1-TreeLeafNode) | repeated | if the nodes is empty, mean the server side don't have more tree leaf nodes to send. | + + + + + + +<a name="banyandb-property-v1-NoMorePropertySync"></a> + +### NoMorePropertySync + + + + + + + +<a name="banyandb-property-v1-PropertyMissing"></a> + +### PropertyMissing + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| entity | [string](#string) | | | + + + + + + +<a name="banyandb-property-v1-PropertySync"></a> + +### PropertySync + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| id | [bytes](#bytes) | | | +| property | [Property](#banyandb-property-v1-Property) | | | +| delete_time | [int64](#int64) | | | + + + + + + +<a name="banyandb-property-v1-RepairRequest"></a> + +### RepairRequest + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| tree_root | [TreeRoot](#banyandb-property-v1-TreeRoot) | | compare stage | +| tree_slots | [TreeSlots](#banyandb-property-v1-TreeSlots) | | | +| property_missing | [PropertyMissing](#banyandb-property-v1-PropertyMissing) | | repair stage case 1: client missing but server existing | +| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | | case 2: client existing but server missing case 3: SHA value mismatches | +| no_more_property_sync | [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) | | if client side is already send all the properties(missing or property sync) which means the client side will not sending more properties to sync, server side should close the stream. | + + + + + + +<a name="banyandb-property-v1-RepairResponse"></a> + +### RepairResponse + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| root_compare | [RootCompare](#banyandb-property-v1-RootCompare) | | compare stage | +| differ_tree_summary | [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) | | | +| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | | repair stage case 1: return from PropertyMissing case 3: return if the client is older | + + + + + + +<a name="banyandb-property-v1-RootCompare"></a> + +### RootCompare + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| tree_found | [bool](#bool) | | | +| root_sha_match | [bool](#bool) | | | + + + + + + +<a name="banyandb-property-v1-TreeLeafNode"></a> + +### TreeLeafNode + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| slot_index | [int32](#int32) | | slot_index is the index of the slot in the tree. | +| exists | [bool](#bool) | | if the slot is empty, means the server side don't have the slot. | +| entity | [string](#string) | | if the slot and entity exists, the SHA value of the entity. | +| sha | [string](#string) | | | + + + + + + +<a name="banyandb-property-v1-TreeRoot"></a> + +### TreeRoot + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| group | [string](#string) | | | +| shard_id | [uint32](#uint32) | | | +| root_sha | [string](#string) | | | + + + + + + +<a name="banyandb-property-v1-TreeSlotSHA"></a> + +### TreeSlotSHA + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| slot | [int32](#int32) | | | +| value | [string](#string) | | | + + + + + + +<a name="banyandb-property-v1-TreeSlots"></a> + +### TreeSlots + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| slot_sha | [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA) | repeated | | + + + + + + + + + + + + +<a name="banyandb-property-v1-RepairService"></a> + +### RepairService + + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| Repair | [RepairRequest](#banyandb-property-v1-RepairRequest) stream | [RepairResponse](#banyandb-property-v1-RepairResponse) stream | | + + + + + <a name="banyandb_property_v1_rpc-proto"></a> <p align="right"><a href="#top">Top</a></p> diff --git a/docs/concept/property-repair.md b/docs/concept/property-repair.md index 421419fb..1c9c2050 100644 --- a/docs/concept/property-repair.md +++ b/docs/concept/property-repair.md @@ -153,10 +153,9 @@ After each synchronization cycle, the receiving node will send its trace data ba ## Property Repair Based on the Merkel tree and Gossip concept, the system can proceed with the Property Repair process. -This process is scheduled to run on each data node daily at 1:00 AM(it's configurable as `property-background-repair-cron` flag), and follows these steps: -1. **Select a Group**: The node retrieves a list of Property groups where the number of **replicas is greater than or equal to 2**, and randomly selects one group for repair. +This process is scheduled to run on each data node daily at 2:00 AM(it's configurable as `property-background-repair-cron` flag), and follows these steps: +1. **Select a Group and Select a Shard**: The node retrieves a list of Property groups where the number of **replicas is greater than or equal to 2**, and randomly selects one group for repair, then randomly selects a shard within that group. 2. **Query Node List**: Then determines the list of nodes that hold replicas for the selected group and sends the gossip propagation message to those nodes to synchronize the Property data for that group. -3. **Wait for the Result**: The initiating node waits for the final result of the synchronization process before proceeding. ### Property Synchronize between Two Nodes @@ -165,20 +164,22 @@ Let’s refer to the current node as A and the target node as B. The process is 1. **(A)Establish Connection**: Node A initiates a **bidirectional streaming connection** with node B to enable real-time, two-way data transfer. 2. **(A)Iterate Over Shards**: Node A retrieves the list of all Property-related shards for the selected group and processes them one by one. -3. **(A)Send Merkle Tree Summary**: For each shard, node A reads its Merkle Tree and sends a summary (including root SHA and slots SHA) to node B. +3. **(A)Send Merkle Tree Root**: For each shard, node A reads its Merkle Tree and sends root SHA to node B. This allows B to quickly identify which slots may contain differences. -4. **(B)Verify Merkle Tree Summary and Respond**: Node B compares the received summary against its own Merkle Tree for the same shard and group: - * If the root SHA matches, node B returns an empty slot list, indicating no differences. - * If the root SHA differ, node B checks the slot SHA, identifies mismatched slots, and sends back all relevant leaf node details, including the **slot index**, **entity**, and **SHA value**. -5. **(A)Compare Leaf Data**: Node A processes the received leaf data and takes the following actions: +4. **(B)Receive Merkle Tree Root**: Node B receives the root SHA from node A and sent the root SHA value is same or not. +5. **(A)Send Slot Summary**: If the root SHA matches, node A finished the synchronization for that shard and moves to the next one. + If the root SHA differs, node A sends a summary of the slots, including the **slot index** and **SHA value** for each slot that has differences. +6. **(B)Verify Merkle Tree Summary and Respond**: Node B compares the received summary against its own Merkle Tree for the same shard and group, +identifies mismatched slots, and sends back all relevant leaf node details, including the **slot index**, **entity**, and **SHA value**. +7. **(A)Compare Leaf Data**: Node A processes the received leaf data and takes the following actions: * For missing entities (present on B but not on A), A requests the full Property data from B. * For entities present on A, but not on B, A sends the full Property data to B. * For SHA mismatches, A sends its full Property data to B for validation. -6. **(B)Validate Actual Data**: Node B handles the data as follows: +8. **(B)Validate Actual Data**: Node B handles the data as follows: * For missing entities, B returns the latest version of the data. * For entities present on A, but not on B, B updates its local copy with the data from A. * For SHA mismatches, B uses the "last-write-win" strategy. It compares the version numbers. If B’s version is newer, it returns the Property data to A. If A’s version is newer, B updates its local copy and does not return any data. If the versions are the same, it selects the data from the smaller index of the node list; in this case, it would be from A. -7. **(A)Update Local Data**: Node A receives updated Property data from B and applies the changes to its local store. +9. **(A)Update Local Data**: Node A receives updated Property data from B and applies the changes to its local store. This concludes the A-to-B synchronization cycle for a given group in the Property repair process. @@ -186,7 +187,9 @@ This concludes the A-to-B synchronization cycle for a given group in the Propert During the synchronization process, the system will terminate or skip processing under the following scenarios: -1. **Merkle Tree Not Built**: If either node A or node B has not yet built the Merkle Tree for the target group, the gossip protocol is immediately terminated. +1. **Merkle Tree Not Built**: + 1. If node A(client) has not yet built the Merkel Tree for the target group, the gossip protocol is immediately terminated. + 2. If node B(server) has not yet built the Merkle Tree for the target group, the client would skip the current server and try to find another next to continue the gossip protocol. 2. **Duplicate Sync Request**: If a new gossip sync request for the same group is received by either node while an existing synchronization is in progress, the new request is terminated to avoid conflicts. 3. **Target Node Connection Failure Handling**: When the current node fails to connect to the target node, the following fallback logic applies: 1. **Target Not in Connection Pool**: Skip the target, decrement the max round count by 1, and attempt the next available node. Trace information is recorded. diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go index 9812d253..12deb057 100644 --- a/pkg/index/inverted/query.go +++ b/pkg/index/inverted/query.go @@ -683,6 +683,25 @@ func BuildPropertyQuery(req *propertyv1.QueryRequest, groupField, idField string }, nil } +// BuildPropertyQueryFromEntity builds a property query from entity information. +func BuildPropertyQueryFromEntity(groupField, group, name, entityIDField, entityID string) (index.Query, error) { + if group == "" || name == "" || entityID == "" { + return nil, errors.New("group, name and entityID are mandatory for property query") + } + bq := bluge.NewBooleanQuery() + bn := newMustNode() + bq.AddMust(bluge.NewTermQuery(group).SetField(groupField)) + bn.Append(newTermNode(group, nil)) + bq.AddMust(bluge.NewTermQuery(name).SetField(index.IndexModeName)) + bn.Append(newTermNode(name, nil)) + bq.AddMust(bluge.NewTermQuery(entityID).SetField(entityIDField)) + bn.Append(newTermNode(entityID, nil)) + return &queryNode{ + query: bq, + node: bn, + }, nil +} + var ( _ logical.Schema = (*schema)(nil) schemaInstance = &schema{}