This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 3267b041 Support background property repair (#712)
3267b041 is described below

commit 3267b0418558912ef1a28f48666336d6e7bc2c0d
Author: mrproliu <741550...@qq.com>
AuthorDate: Mon Aug 4 07:19:53 2025 +0800

    Support background property repair (#712)
    
    * Support background property repair
---
 api/proto/banyandb/property/v1/gossip.proto |   5 +-
 api/proto/banyandb/property/v1/repair.proto | 105 ++++
 banyand/property/db.go                      |  33 +-
 banyand/property/gossip/client.go           |  17 +-
 banyand/property/gossip/message.go          |  12 +-
 banyand/property/gossip/server.go           |  89 ++-
 banyand/property/gossip/service.go          |  62 +-
 banyand/property/gossip/service_test.go     |  19 +-
 banyand/property/repair.go                  | 303 ++++++++--
 banyand/property/repair_gossip.go           | 891 ++++++++++++++++++++++++++++
 banyand/property/repair_gossip_test.go      | 581 ++++++++++++++++++
 banyand/property/repair_test.go             |  26 +-
 banyand/property/service.go                 |  23 +-
 banyand/property/shard.go                   |  32 +-
 banyand/property/shard_test.go              |  20 +-
 docs/api-reference.md                       | 217 +++++++
 docs/concept/property-repair.md             |  25 +-
 pkg/index/inverted/query.go                 |  19 +
 18 files changed, 2306 insertions(+), 173 deletions(-)

diff --git a/api/proto/banyandb/property/v1/gossip.proto 
b/api/proto/banyandb/property/v1/gossip.proto
index 9a23c02c..6bae7523 100644
--- a/api/proto/banyandb/property/v1/gossip.proto
+++ b/api/proto/banyandb/property/v1/gossip.proto
@@ -30,8 +30,9 @@ message PropagationContext {
 }
 
 message PropagationRequest {
-  PropagationContext context = 2;
-  string group = 3;
+  PropagationContext context = 1;
+  string group = 2;
+  uint32 shard_id = 3;
 }
 
 message PropagationResponse {}
diff --git a/api/proto/banyandb/property/v1/repair.proto 
b/api/proto/banyandb/property/v1/repair.proto
new file mode 100644
index 00000000..24510ed9
--- /dev/null
+++ b/api/proto/banyandb/property/v1/repair.proto
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package banyandb.property.v1;
+
+import "banyandb/property/v1/property.proto";
+
+option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1";
+option java_package = "org.apache.skywalking.banyandb.property.v1";
+
+message TreeSlotSHA {
+  int32 slot = 1;
+  string value = 2;
+}
+
+message TreeLeafNode {
+  // slot_index is the index of the slot in the tree.
+  int32 slot_index = 1;
+  // if the slot is empty, means the server side don't have the slot.
+  bool exists = 2;
+  // if the slot and entity exists, the SHA value of the entity.
+  string entity = 3;
+  string sha = 4;
+}
+
+message TreeRoot {
+  string group = 1;
+  uint32 shard_id = 2;
+  string root_sha = 3;
+}
+
+message TreeSlots {
+  repeated TreeSlotSHA slot_sha = 1;
+}
+
+message PropertyMissing {
+  string entity = 1;
+}
+
+message RootCompare {
+  bool tree_found = 1;
+  bool root_sha_match = 2;
+}
+
+message DifferTreeSummary {
+  // if the nodes is empty, mean the server side don't have more tree leaf 
nodes to send.
+  repeated TreeLeafNode nodes = 2;
+}
+
+message PropertySync {
+  bytes id = 1;
+  banyandb.property.v1.Property property = 2;
+  int64 delete_time = 3;
+}
+
+message NoMorePropertySync {}
+
+message RepairRequest {
+  oneof data {
+    // compare stage
+    TreeRoot tree_root = 1;
+    TreeSlots tree_slots = 2;
+    // repair stage
+    // case 1: client missing but server existing
+    PropertyMissing property_missing = 3;
+    // case 2: client existing but server missing
+    // case 3: SHA value mismatches
+    PropertySync property_sync = 4;
+    // if client side is already send all the properties(missing or property 
sync)
+    // which means the client side will not sending more properties to sync, 
server side should close the stream.
+    NoMorePropertySync no_more_property_sync = 5;
+  }
+}
+
+message RepairResponse {
+  oneof data {
+    // compare stage
+    RootCompare root_compare = 1;
+    DifferTreeSummary differ_tree_summary = 2;
+    // repair stage
+    // case 1: return from PropertyMissing
+    // case 3: return if the client is older
+    PropertySync property_sync = 3;
+  }
+}
+
+service RepairService {
+  rpc Repair(stream RepairRequest) returns (stream RepairResponse) {}
+}
diff --git a/banyand/property/db.go b/banyand/property/db.go
index 14a5b1ea..ea5475ab 100644
--- a/banyand/property/db.go
+++ b/banyand/property/db.go
@@ -32,7 +32,9 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,19 +50,20 @@ var (
 )
 
 type database struct {
-       lock                fs.File
+       metadata            metadata.Repo
        omr                 observability.MetricsRegistry
+       lfs                 fs.FileSystem
+       lock                fs.File
        logger              *logger.Logger
        repairScheduler     *repairScheduler
-       lfs                 fs.FileSystem
        sLst                atomic.Pointer[[]*shard]
        location            string
        repairBaseDir       string
        flushInterval       time.Duration
        expireDelete        time.Duration
        repairTreeSlotCount int
-       closed              atomic.Bool
        mu                  sync.RWMutex
+       closed              atomic.Bool
 }
 
 func openDB(
@@ -71,9 +74,13 @@ func openDB(
        repairSlotCount int,
        omr observability.MetricsRegistry,
        lfs fs.FileSystem,
+       repairEnabled bool,
        repairBaseDir string,
        repairBuildTreeCron string,
        repairQuickBuildTreeTime time.Duration,
+       repairTriggerCron string,
+       gossipMessenger gossip.Messenger,
+       metadata metadata.Repo,
        buildSnapshotFunc func(context.Context) (string, error),
 ) (*database, error) {
        loc := filepath.Clean(location)
@@ -89,13 +96,18 @@ func openDB(
                repairTreeSlotCount: repairSlotCount,
                repairBaseDir:       repairBaseDir,
                lfs:                 lfs,
+               metadata:            metadata,
        }
+       var err error
        // init repair scheduler
-       scheduler, err := newRepairScheduler(l, omr, repairBuildTreeCron, 
repairQuickBuildTreeTime, db, buildSnapshotFunc)
-       if err != nil {
-               return nil, errors.Wrapf(err, "failed to create repair 
scheduler for %s", loc)
+       if repairEnabled {
+               scheduler, schedulerErr := newRepairScheduler(l, omr, 
repairBuildTreeCron, repairQuickBuildTreeTime, repairTriggerCron,
+                       gossipMessenger, repairSlotCount, db, buildSnapshotFunc)
+               if schedulerErr != nil {
+                       return nil, errors.Wrapf(schedulerErr, "failed to 
create repair scheduler for %s", loc)
+               }
+               db.repairScheduler = scheduler
        }
-       db.repairScheduler = scheduler
        if err = db.load(ctx); err != nil {
                return nil, err
        }
@@ -211,7 +223,9 @@ func (db *database) close() error {
        if db.closed.Swap(true) {
                return nil
        }
-       db.repairScheduler.close()
+       if db.repairScheduler != nil {
+               db.repairScheduler.close()
+       }
        sLst := db.sLst.Load()
        var err error
        if sLst != nil {
@@ -241,7 +255,8 @@ func (db *database) repair(ctx context.Context, id []byte, 
shardID uint64, prope
        if err != nil {
                return errors.WithMessagef(err, "failed to load shard %d", id)
        }
-       return s.repair(ctx, id, property, deleteTime)
+       _, _, err = s.repair(ctx, id, property, deleteTime)
+       return err
 }
 
 type walkFn func(suffix string) error
diff --git a/banyand/property/gossip/client.go 
b/banyand/property/gossip/client.go
index 3a501d22..41da6e70 100644
--- a/banyand/property/gossip/client.go
+++ b/banyand/property/gossip/client.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
-func (s *service) Propagation(nodes []string, group string) error {
+func (s *service) Propagation(nodes []string, group string, shardID uint32) 
error {
        if len(nodes) < 2 {
                return fmt.Errorf("must provide at least 2 node")
        }
@@ -45,6 +45,7 @@ func (s *service) Propagation(nodes []string, group string) 
error {
        request := &propertyv1.PropagationRequest{
                Context: ctx,
                Group:   group,
+               ShardId: shardID,
        }
 
        var sendTo func(context.Context, *propertyv1.PropagationRequest) 
(*propertyv1.PropagationResponse, error)
@@ -84,6 +85,18 @@ func (s *service) Propagation(nodes []string, group string) 
error {
        return nil
 }
 
+func (s *service) LocateNodes(group string, shardID, replicasCount uint32) 
([]string, error) {
+       result := make([]string, 0, replicasCount)
+       for r := range replicasCount {
+               node, err := s.sel.Pick(group, "", shardID, r)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to locate node for group 
%s, shardID %d, replica %d: %w", group, shardID, r, err)
+               }
+               result = append(result, node)
+       }
+       return result, nil
+}
+
 func (s *service) getRegisteredNode(id string) (*databasev1.Node, bool) {
        s.mu.RLock()
        defer s.mu.RUnlock()
@@ -100,6 +113,7 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
                s.log.Warn().Msg("invalid metadata type")
                return
        }
+       s.sel.AddNode(node)
        address := node.PropertyRepairGossipGrpcAddress
        if address == "" {
                s.log.Warn().Stringer("node", node).Msg("node does not have 
gossip address, skipping registration")
@@ -132,6 +146,7 @@ func (s *service) OnDelete(md schema.Metadata) {
                s.log.Warn().Stringer("node", node).Msg("node does not have a 
name, skipping deregistration")
                return
        }
+       s.sel.RemoveNode(node)
 
        s.mu.Lock()
        defer s.mu.Unlock()
diff --git a/banyand/property/gossip/message.go 
b/banyand/property/gossip/message.go
index 4ec1b167..fe26cd1c 100644
--- a/banyand/property/gossip/message.go
+++ b/banyand/property/gossip/message.go
@@ -20,12 +20,16 @@ package gossip
 import (
        "context"
 
+       "github.com/pkg/errors"
        "google.golang.org/grpc"
 
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// ErrAbortPropagation is an error that indicates the gossip propagation of a 
message should be aborted.
+var ErrAbortPropagation = errors.New("abort propagation")
+
 // MessageListener is an interface that defines a method to handle the 
incoming propagation message.
 type MessageListener interface {
        Rev(ctx context.Context, nextNode *grpc.ClientConn, request 
*propertyv1.PropagationRequest) error
@@ -48,14 +52,18 @@ type Messenger interface {
 type MessageClient interface {
        run.Unit
        // Propagation using anti-entropy gossip protocol to propagate messages 
to the specified nodes.
-       Propagation(nodes []string, topic string) error
+       Propagation(nodes []string, group string, shardID uint32) error
+       // LocateNodes finds nodes in the specified group and shard ID, 
returning a list of node addresses.
+       LocateNodes(group string, shardID, replicasCount uint32) ([]string, 
error)
 }
 
 // MessageServer is an interface that defines methods for subscribing to 
topics and receiving messages in a gossip protocol.
 type MessageServer interface {
        run.Unit
        // Subscribe allows subscribing to a topic to receive messages.
-       Subscribe(listener MessageListener) error
+       Subscribe(listener MessageListener)
+       // RegisterServices registers the gRPC services with the provided 
server.
+       RegisterServices(func(r *grpc.Server))
        // GetServerPort returns the port number of the server.
        GetServerPort() *uint32
 }
diff --git a/banyand/property/gossip/server.go 
b/banyand/property/gossip/server.go
index ff943ada..c7e7ac34 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -49,13 +49,21 @@ var (
              "RetryableStatusCodes": [ "UNAVAILABLE" ]
          }
        }]}`, serviceName)
+
+       // perNodeSyncTimeout is the timeout for each node to sync the property 
data.
+       perNodeSyncTimeout = time.Minute * 10
 )
 
-func (s *service) Subscribe(listener MessageListener) error {
+func (s *service) Subscribe(listener MessageListener) {
        s.listenersLock.Lock()
        defer s.listenersLock.Unlock()
        s.listeners = append(s.listeners, listener)
-       return nil
+}
+
+func (s *service) RegisterServices(f func(r *grpc.Server)) {
+       s.serviceRegisterLock.Lock()
+       defer s.serviceRegisterLock.Unlock()
+       s.serviceRegister = append(s.serviceRegister, f)
 }
 
 func (s *service) getListener() MessageListener {
@@ -67,7 +75,16 @@ func (s *service) getListener() MessageListener {
        return s.listeners[0]
 }
 
-type groupPropagation struct {
+func (s *service) getServiceRegisters() []func(server *grpc.Server) {
+       s.serviceRegisterLock.RLock()
+       defer s.serviceRegisterLock.RUnlock()
+       if len(s.serviceRegister) == 0 {
+               return nil
+       }
+       return s.serviceRegister
+}
+
+type groupWithShardPropagation struct {
        latestTime     time.Time
        channel        chan *propertyv1.PropagationRequest
        originalNodeID string
@@ -75,17 +92,17 @@ type groupPropagation struct {
 
 type protocolHandler struct {
        propertyv1.UnimplementedGossipServiceServer
-       s           *service
-       groups      map[string]*groupPropagation
-       groupNotify chan struct{}
-       mu          sync.RWMutex
+       s               *service
+       groupWithShards map[string]*groupWithShardPropagation
+       groupNotify     chan struct{}
+       mu              sync.RWMutex
 }
 
 func newProtocolHandler(s *service) *protocolHandler {
        return &protocolHandler{
-               s:           s,
-               groups:      make(map[string]*groupPropagation),
-               groupNotify: make(chan struct{}, 10),
+               s:               s,
+               groupWithShards: make(map[string]*groupWithShardPropagation),
+               groupNotify:     make(chan struct{}, 10),
        }
 }
 
@@ -98,7 +115,9 @@ func (q *protocolHandler) processPropagation() {
                        if request == nil {
                                continue
                        }
-                       err := q.handle(q.s.closer.Ctx(), request)
+                       timeoutCtx, cancelFunc := 
context.WithTimeout(q.s.closer.Ctx(), perNodeSyncTimeout)
+                       err := q.handle(timeoutCtx, request)
+                       cancelFunc()
                        if err != nil {
                                q.s.log.Warn().Err(err).Stringer("request", 
request).
                                        Msgf("handle propagation request 
failure")
@@ -112,7 +131,7 @@ func (q *protocolHandler) processPropagation() {
 func (q *protocolHandler) findUnProcessRequest() 
*propertyv1.PropagationRequest {
        q.mu.Lock()
        defer q.mu.Unlock()
-       for _, g := range q.groups {
+       for _, g := range q.groupWithShards {
                select {
                case d := <-g.channel:
                        return d
@@ -141,6 +160,7 @@ func (q *protocolHandler) handle(ctx context.Context, 
request *propertyv1.Propag
        now := n.UnixNano()
        nodes := request.Context.Nodes
        q.s.serverMetrics.totalStarted.Inc(1, request.Group)
+       q.s.log.Debug().Stringer("request", request).Msgf("handling gossip 
message for propagation")
        var needsKeepPropagation bool
        defer func() {
                if err != nil {
@@ -175,6 +195,11 @@ func (q *protocolHandler) handle(ctx context.Context, 
request *propertyv1.Propag
                // process the message using the listener
                err = listener.Rev(ctx, nextNodeConn, request)
                if err != nil {
+                       if errors.Is(err, ErrAbortPropagation) {
+                               q.s.log.Warn().Err(err).Msgf("propagation 
aborted by listener for node: %s", nextNodeID)
+                               _ = nextNodeConn.Close()
+                               return nil // Abort propagation, no need to 
continue
+                       }
                        q.s.log.Warn().Err(err).Msgf("failed to process with 
next node: %s", nextNodeID)
                        _ = nextNodeConn.Close()
                        continue
@@ -199,6 +224,14 @@ func (q *protocolHandler) handle(ctx context.Context, 
request *propertyv1.Propag
                return nil
        }
 
+       if q.contextIsDone(ctx) {
+               if nextNodeConn != nil {
+                       _ = nextNodeConn.Close()
+               }
+               q.s.log.Debug().Msgf("context is done, no need to propagate 
further")
+               return nil
+       }
+
        // propagate the message to the next node
        needsKeepPropagation = true
        q.s.serverMetrics.totalSendToNextStarted.Inc(1, request.Group)
@@ -217,42 +250,52 @@ func (q *protocolHandler) handle(ctx context.Context, 
request *propertyv1.Propag
        return nil
 }
 
+func (q *protocolHandler) contextIsDone(ctx context.Context) bool {
+       select {
+       case <-ctx.Done():
+               return true
+       default:
+               return false
+       }
+}
+
 func (q *protocolHandler) addToProcess(request *propertyv1.PropagationRequest) 
bool {
        q.mu.Lock()
        defer q.mu.Unlock()
 
-       group, exist := q.groups[request.Group]
+       shardKey := fmt.Sprintf("%s_%d", request.Group, request.ShardId)
+       groupShard, exist := q.groupWithShards[shardKey]
        if !exist {
-               group = &groupPropagation{
+               groupShard = &groupWithShardPropagation{
                        channel:        make(chan 
*propertyv1.PropagationRequest, 1),
                        originalNodeID: request.Context.OriginNode,
                        latestTime:     time.Now(),
                }
-               group.channel <- request
-               q.groups[request.Group] = group
+               groupShard.channel <- request
+               q.groupWithShards[shardKey] = groupShard
                q.notifyNewRequest()
                return true
        }
 
        // if the latest round is out of ttl, then needs to change to current 
node to executing
-       if time.Since(group.latestTime) > q.s.scheduleInterval/2 {
-               group.originalNodeID = request.Context.OriginNode
+       if time.Since(groupShard.latestTime) > q.s.scheduleInterval/2 {
+               groupShard.originalNodeID = request.Context.OriginNode
                select {
-               case group.channel <- request:
+               case groupShard.channel <- request:
                        q.notifyNewRequest()
                default:
-                       q.s.log.Error().Msgf("ready to added propagation into 
group %s in a new round, but it's full", request.Group)
+                       q.s.log.Error().Msgf("ready to added propagation into 
group shard %s(%d) in a new round, but it's full", request.Group, 
request.ShardId)
                }
                return true
        }
 
        // if the original node ID are a same node, means which from the same 
round
-       if group.originalNodeID == request.Context.OriginNode {
+       if groupShard.originalNodeID == request.Context.OriginNode {
                select {
-               case group.channel <- request:
+               case groupShard.channel <- request:
                        q.notifyNewRequest()
                default:
-                       q.s.log.Error().Msgf("ready to added propagation into 
group %s in a same round, but it's full", request.Group)
+                       q.s.log.Error().Msgf("ready to added propagation into 
group shard %s(%d) in a same round, but it's full", request.Group, 
request.ShardId)
                }
                return true
        }
diff --git a/banyand/property/gossip/service.go 
b/banyand/property/gossip/service.go
index a52319ee..625b5ff7 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -41,6 +41,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -60,31 +61,32 @@ const (
 
 type service struct {
        schema.UnimplementedOnInitHandler
-       metadata        metadata.Repo
-       creds           credentials.TransportCredentials
-       omr             observability.MetricsRegistry
-       registered      map[string]*databasev1.Node
-       ser             *grpclib.Server
-       serverMetrics   *serverMetrics
-       log             *logger.Logger
-       closer          *run.Closer
-       protocolHandler *protocolHandler
-       keyFile         string
-       addr            string
-       certFile        string
-       host            string
-       nodeID          string
-       caCertPath      string
-       listeners       []MessageListener
-       maxRecvMsgSize  run.Bytes
-       listenersLock   sync.RWMutex
-       mu              sync.RWMutex
-       port            uint32
-       tls             bool
-
-       totalTimeout time.Duration
-       // TODO: should pass by property repair configuration
-       scheduleInterval time.Duration
+       metadata            metadata.Repo
+       creds               credentials.TransportCredentials
+       omr                 observability.MetricsRegistry
+       sel                 node.Selector
+       registered          map[string]*databasev1.Node
+       ser                 *grpclib.Server
+       serverMetrics       *serverMetrics
+       log                 *logger.Logger
+       closer              *run.Closer
+       protocolHandler     *protocolHandler
+       certFile            string
+       keyFile             string
+       host                string
+       nodeID              string
+       caCertPath          string
+       addr                string
+       listeners           []MessageListener
+       serviceRegister     []func(s *grpclib.Server)
+       maxRecvMsgSize      run.Bytes
+       totalTimeout        time.Duration
+       scheduleInterval    time.Duration
+       serviceRegisterLock sync.RWMutex
+       mu                  sync.RWMutex
+       listenersLock       sync.RWMutex
+       port                uint32
+       tls                 bool
 }
 
 // NewMessenger creates a new instance of Messenger for gossip propagation 
communication between nodes.
@@ -99,12 +101,15 @@ func NewMessenger(omr observability.MetricsRegistry, 
metadata metadata.Repo) Mes
                listeners:        make([]MessageListener, 0),
                registered:       make(map[string]*databasev1.Node),
                scheduleInterval: time.Hour * 2,
+               sel:              node.NewRoundRobinSelector("", metadata),
        }
 }
 
 // NewMessengerWithoutMetadata creates a new instance of Messenger without 
metadata.
-func NewMessengerWithoutMetadata(omr observability.MetricsRegistry) Messenger {
-       return NewMessenger(omr, nil)
+func NewMessengerWithoutMetadata(omr observability.MetricsRegistry, port int) 
Messenger {
+       messenger := NewMessenger(omr, nil)
+       messenger.(*service).port = uint32(port)
+       return messenger
 }
 
 func (s *service) PreRun(ctx context.Context) error {
@@ -207,6 +212,9 @@ func (s *service) Serve(stopCh chan struct{}) {
        s.ser = grpclib.NewServer(opts...)
 
        propertyv1.RegisterGossipServiceServer(s.ser, s.protocolHandler)
+       for _, register := range s.getServiceRegisters() {
+               register(s.ser)
+       }
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
diff --git a/banyand/property/gossip/service_test.go 
b/banyand/property/gossip/service_test.go
index a36f3a38..82fb4259 100644
--- a/banyand/property/gossip/service_test.go
+++ b/banyand/property/gossip/service_test.go
@@ -68,7 +68,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodes = startNodes(1)
                node := nodes[0]
 
-               err := node.messenger.Propagation([]string{node.nodeID}, "test")
+               err := node.messenger.Propagation([]string{node.nodeID}, 
"test", 0)
                gomega.Expect(err).To(gomega.HaveOccurred())
        })
 
@@ -76,7 +76,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodes = startNodes(2)
                node1, node2 := nodes[0], nodes[1]
 
-               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID}, mockGroup)
+               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                nodeVerify(node1, []string{node2.nodeID}, 1)
        })
@@ -85,7 +85,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodes = startNodes(2)
                node1, node2 := nodes[0], nodes[1]
 
-               err := node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID}, mockGroup)
+               err := node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                nodeVerify(node1, []string{node2.nodeID}, 1)
                nodeVerify(node2, []string{}, 0)
@@ -95,7 +95,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodes = startNodes(2)
                node1, node2 := nodes[0], nodes[1]
 
-               err := node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, "no-existing"}, mockGroup)
+               err := node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, "no-existing"}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                nodeVerify(node1, []string{node2.nodeID, node2.nodeID}, 2)
                nodeVerify(node2, []string{node1.nodeID}, 1)
@@ -105,7 +105,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                nodes = startNodes(3)
                node1, node2, node3 := nodes[0], nodes[1], nodes[2]
 
-               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup)
+               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                nodeVerify(node1, []string{node2.nodeID}, 1)
                nodeVerify(node2, []string{node3.nodeID}, 1)
@@ -117,7 +117,7 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                node1, node2, node3 := nodes[0], nodes[1], nodes[2]
                node3.listener.mockErr = fmt.Errorf("mock error")
 
-               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup)
+               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                nodeVerify(node1, []string{node2.nodeID}, 1)
                nodeVerify(node2, []string{node3.nodeID}, 1)
@@ -129,9 +129,9 @@ var _ = ginkgo.Describe("Propagation Messenger", func() {
                node1, node2, node3 := nodes[0], nodes[1], nodes[2]
                node1.listener.delay = time.Second
 
-               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup)
+               err := node1.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
-               err = node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup)
+               err = node2.messenger.Propagation([]string{node1.nodeID, 
node2.nodeID, node3.nodeID}, mockGroup, 0)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
                // should only the first propagation execute, the second one is 
ignored
@@ -183,9 +183,8 @@ func startNodes(count int) []*nodeContext {
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
                // starting gossip messenger
-               messenger := 
NewMessengerWithoutMetadata(observability.NewBypassRegistry())
+               messenger := 
NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
                gomega.Expect(messenger).NotTo(gomega.BeNil())
-               messenger.(*service).port = uint32(ports[0])
                addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
                
messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
                        NodeID: addr,
diff --git a/banyand/property/repair.go b/banyand/property/repair.go
index 3083249e..d06cb0cd 100644
--- a/banyand/property/repair.go
+++ b/banyand/property/repair.go
@@ -26,11 +26,13 @@ import (
        "fmt"
        "hash"
        "io"
+       "math/rand/v2"
        "os"
        "path"
        "path/filepath"
        "sort"
        "strconv"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -42,10 +44,14 @@ import (
        "github.com/pkg/errors"
        "github.com/robfig/cron/v3"
        "go.uber.org/multierr"
+       grpclib "google.golang.org/grpc"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -119,7 +125,7 @@ func (r *repair) checkHasUpdates() (bool, error) {
        return true, nil
 }
 
-func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err 
error) {
+func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group 
string) (err error) {
        startTime := time.Now()
        defer func() {
                r.metrics.totalBuildTreeFinished.Inc(1)
@@ -136,10 +142,15 @@ func (r *repair) buildStatus(ctx context.Context, 
snapshotPath string) (err erro
        sort.Sort(snapshotIDList(items))
 
        blugeConf := bluge.DefaultConfig(snapshotPath)
-       err = r.buildTree(ctx, blugeConf)
+       err = r.buildTree(ctx, blugeConf, group)
        if err != nil {
                return fmt.Errorf("building trees failure: %w", err)
        }
+       // if only update a specific group, the repair base status file doesn't 
need to update
+       // because not all the group have been processed
+       if group != "" {
+               return nil
+       }
 
        var latestSnapshotID uint64
        if len(items) > 0 {
@@ -154,6 +165,9 @@ func (r *repair) buildStatus(ctx context.Context, 
snapshotPath string) (err erro
        if err != nil {
                return fmt.Errorf("marshall state failure: %w", err)
        }
+       if err = os.MkdirAll(filepath.Dir(r.statePath), storage.DirPerm); err 
!= nil {
+               return fmt.Errorf("creating state directory failure: %w", err)
+       }
        err = os.WriteFile(r.statePath, stateVal, storage.FilePerm)
        if err != nil {
                return fmt.Errorf("writing state file failure: %w", err)
@@ -161,15 +175,23 @@ func (r *repair) buildStatus(ctx context.Context, 
snapshotPath string) (err erro
        return nil
 }
 
-func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error {
+func (r *repair) buildTree(ctx context.Context, conf bluge.Config, group 
string) error {
        reader, err := bluge.OpenReader(conf)
        if err != nil {
+               // means no data found
+               if strings.Contains(err.Error(), "unable to find a usable 
snapshot") {
+                       return nil
+               }
                return fmt.Errorf("opening index reader failure: %w", err)
        }
        defer func() {
                _ = reader.Close()
        }()
-       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, 
bluge.NewMatchAllQuery())
+       query := bluge.Query(bluge.NewMatchAllQuery())
+       if group != "" {
+               query = bluge.NewTermQuery(group).SetField(groupField)
+       }
+       topNSearch := bluge.NewTopNSearch(r.batchSearchSize, query)
        topNSearch.SortBy([]string{
                fmt.Sprintf("+%s", groupField),
                fmt.Sprintf("+%s", nameField),
@@ -185,7 +207,7 @@ func (r *repair) buildTree(ctx context.Context, conf 
bluge.Config) error {
                }
                group := convert.BytesToString(sortValue[0])
                name := convert.BytesToString(sortValue[1])
-               entity := fmt.Sprintf("%s/%s/%s", group, name, 
convert.BytesToString(sortValue[2]))
+               entity := r.buildLeafNodeEntity(group, name, 
convert.BytesToString(sortValue[2]))
 
                s := newSearchingProperty(group, shaValue, entity)
                if latestProperty != nil {
@@ -226,6 +248,18 @@ func (r *repair) buildTree(ctx context.Context, conf 
bluge.Config) error {
        return nil
 }
 
+func (r *repair) buildLeafNodeEntity(group, name, entityID string) string {
+       return fmt.Sprintf("%s/%s/%s", group, name, entityID)
+}
+
+func (r *repair) parseLeafNodeEntity(entity string) (string, string, string, 
error) {
+       parts := strings.SplitN(entity, "/", 3)
+       if len(parts) != 3 {
+               return "", "", "", fmt.Errorf("invalid leaf node entity format: 
%s", entity)
+       }
+       return parts[0], parts[1], parts[2], nil
+}
+
 func (r *repair) pageSearch(
        ctx context.Context,
        reader *bluge.Reader,
@@ -318,19 +352,28 @@ const (
 
 type repairTreeNode struct {
        shaValue  string
-       id        string
+       entity    string
        tp        repairTreeNodeType
        leafStart int64
        leafCount int64
+       slotInx   int32
 }
-type repairTreeReader struct {
+
+type repairTreeReader interface {
+       read(parent *repairTreeNode, pagingSize int64, forceReadFromStart bool) 
([]*repairTreeNode, error)
+       close() error
+}
+
+type repairTreeFileReader struct {
        file   *os.File
        reader *bufio.Reader
        footer *repairTreeFooter
        paging *repairTreeReaderPage
 }
 
-func (r *repair) treeReader(group string) (*repairTreeReader, error) {
+func (r *repair) treeReader(group string) (repairTreeReader, error) {
+       r.scheduler.treeLocker.RLock()
+       defer r.scheduler.treeLocker.RUnlock()
        groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group)
        file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm)
        if err != nil {
@@ -340,7 +383,7 @@ func (r *repair) treeReader(group string) 
(*repairTreeReader, error) {
                }
                return nil, fmt.Errorf("opening repair tree file %s failure: 
%w", group, err)
        }
-       reader := &repairTreeReader{
+       reader := &repairTreeFileReader{
                file:   file,
                reader: bufio.NewReader(file),
        }
@@ -351,7 +394,18 @@ func (r *repair) treeReader(group string) 
(*repairTreeReader, error) {
        return reader, nil
 }
 
-func (r *repairTreeReader) readFoot() error {
+func (r *repair) stateFileExist() (bool, error) {
+       _, err := os.Stat(r.statePath)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return false, nil
+               }
+               return false, fmt.Errorf("checking state file existence 
failure: %w", err)
+       }
+       return true, nil
+}
+
+func (r *repairTreeFileReader) readFoot() error {
        stat, err := r.file.Stat()
        if err != nil {
                return fmt.Errorf("getting file stat for %s failure: %w", 
r.file.Name(), err)
@@ -386,7 +440,7 @@ func (r *repairTreeReader) readFoot() error {
        return nil
 }
 
-func (r *repairTreeReader) seekPosition(offset int64, whence int) error {
+func (r *repairTreeFileReader) seekPosition(offset int64, whence int) error {
        _, err := r.file.Seek(offset, whence)
        if err != nil {
                return fmt.Errorf("seeking position failure: %w", err)
@@ -395,7 +449,7 @@ func (r *repairTreeReader) seekPosition(offset int64, 
whence int) error {
        return nil
 }
 
-func (r *repairTreeReader) read(parent *repairTreeNode, pagingSize int64) 
([]*repairTreeNode, error) {
+func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, 
forceReFromStart bool) ([]*repairTreeNode, error) {
        if parent == nil {
                // reading the root node
                err := r.seekPosition(r.footer.slotNodeFinishedOffset, 
io.SeekStart)
@@ -421,7 +475,7 @@ func (r *repairTreeReader) read(parent *repairTreeNode, 
pagingSize int64) ([]*re
        var err error
        if parent.tp == repairTreeNodeTypeRoot {
                needSeek := false
-               if r.paging == nil || r.paging.lastNode != parent {
+               if r.paging == nil || r.paging.lastNode != parent || 
forceReFromStart {
                        needSeek = true
                        r.paging = newRepairTreeReaderPage(parent, 
r.footer.slotNodeCount)
                }
@@ -458,7 +512,7 @@ func (r *repairTreeReader) read(parent *repairTreeNode, 
pagingSize int64) ([]*re
                        }
                        nodes = append(nodes, &repairTreeNode{
                                shaValue:  string(slotShaVal),
-                               id:        fmt.Sprintf("%d", slotNodeIndex),
+                               slotInx:   int32(slotNodeIndex),
                                tp:        repairTreeNodeTypeSlot,
                                leafStart: leafStartOff,
                                leafCount: leafCount,
@@ -472,9 +526,9 @@ func (r *repairTreeReader) read(parent *repairTreeNode, 
pagingSize int64) ([]*re
 
        // otherwise, reading the leaf nodes
        needSeek := false
-       if r.paging == nil || r.paging.lastNode != parent {
+       if r.paging == nil || r.paging.lastNode != parent || forceReFromStart {
                needSeek = true
-               r.paging = newRepairTreeReaderPage(parent, 
r.footer.slotNodeCount)
+               r.paging = newRepairTreeReaderPage(parent, parent.leafCount)
        }
        if needSeek {
                err = r.seekPosition(parent.leafStart, io.SeekStart)
@@ -499,15 +553,16 @@ func (r *repairTreeReader) read(parent *repairTreeNode, 
pagingSize int64) ([]*re
                        return nil, fmt.Errorf("decoding leaf node sha value 
from file %s failure: %w", r.file.Name(), err)
                }
                nodes = append(nodes, &repairTreeNode{
+                       slotInx:  parent.slotInx,
                        shaValue: string(shaVal),
-                       id:       string(entity),
+                       entity:   string(entity),
                        tp:       repairTreeNodeTypeLeaf,
                })
        }
        return nodes, nil
 }
 
-func (r *repairTreeReader) close() error {
+func (r *repairTreeFileReader) close() error {
        return r.file.Close()
 }
 
@@ -528,13 +583,65 @@ func (r *repairTreeReaderPage) nextPage(count int64) 
int64 {
                return 0
        }
        readCount := r.reduceCount - count
-       if readCount < 0 {
+       if readCount <= 0 {
                readCount = r.reduceCount
        }
        r.reduceCount -= readCount
        return readCount
 }
 
+type repairBufferLeafReader struct {
+       reader      repairTreeReader
+       currentSlot *repairTreeNode
+       pagingLeafs []*repairTreeNode
+}
+
+func newRepairBufferLeafReader(reader repairTreeReader) 
*repairBufferLeafReader {
+       return &repairBufferLeafReader{
+               reader: reader,
+       }
+}
+
+func (r *repairBufferLeafReader) next(slot *repairTreeNode) (*repairTreeNode, 
error) {
+       // slot is nil means reset the reader
+       if slot == nil {
+               r.currentSlot = nil
+               return nil, nil
+       }
+       if r.currentSlot == nil || r.currentSlot.slotInx != slot.slotInx {
+               // if the current slot is nil or the slot index is changed, we 
need to read the leaf nodes from the slot
+               err := r.readNodes(slot, true)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       // if no more leaf nodes, trying to read slots
+       if len(r.pagingLeafs) == 0 {
+               err := r.readNodes(slot, false)
+               if err != nil {
+                       return nil, err
+               }
+               // no more leaf nodes to read, return nil
+               if len(r.pagingLeafs) == 0 {
+                       return nil, nil
+               }
+       }
+       // pop the first leaf node from the paging leafs
+       leaf := r.pagingLeafs[0]
+       r.pagingLeafs = r.pagingLeafs[1:]
+       return leaf, nil
+}
+
+func (r *repairBufferLeafReader) readNodes(slot *repairTreeNode, 
forceReadFromStart bool) error {
+       nodes, err := r.reader.read(slot, repairBatchSearchSize, 
forceReadFromStart)
+       if err != nil {
+               return fmt.Errorf("reading leaf nodes from slot %d failure: 
%w", slot.slotInx, err)
+       }
+       r.pagingLeafs = nodes
+       r.currentSlot = slot
+       return nil
+}
+
 type repairTreeFooter struct {
        leafNodeFinishedOffset int64
        slotNodeCount          int64
@@ -824,24 +931,30 @@ func newRepairMetrics(fac *observability.Factory) 
*repairMetrics {
 type repairScheduler struct {
        latestBuildTreeSchedule   time.Time
        buildTreeClock            clock.Clock
-       l                         *logger.Logger
+       gossipMessenger           gossip.Messenger
        closer                    *run.Closer
        buildSnapshotFunc         func(context.Context) (string, error)
        repairTreeScheduler       *timestamp.Scheduler
        quickRepairNotified       *int32
        db                        *database
+       l                         *logger.Logger
        metrics                   *repairSchedulerMetrics
+       treeSlotCount             int
        buildTreeScheduleInterval time.Duration
        quickBuildTreeTime        time.Duration
        lastBuildTimeLocker       sync.Mutex
-       buildTreeLocker           sync.Mutex
+       treeLocker                sync.RWMutex
 }
 
+// nolint: contextcheck
 func newRepairScheduler(
        l *logger.Logger,
        omr observability.MetricsRegistry,
-       cronExp string,
+       buildTreeCronExp string,
        quickBuildTreeTime time.Duration,
+       triggerCronExp string,
+       gossipMessenger gossip.Messenger,
+       treeSlotCount int,
        db *database,
        buildSnapshotFunc func(context.Context) (string, error),
 ) (*repairScheduler, error) {
@@ -855,16 +968,29 @@ func newRepairScheduler(
                closer:              run.NewCloser(1),
                quickBuildTreeTime:  quickBuildTreeTime,
                metrics:             
newRepairSchedulerMetrics(omr.With(propertyScope.SubScope("scheduler"))),
+               gossipMessenger:     gossipMessenger,
+               treeSlotCount:       treeSlotCount,
        }
        c := timestamp.NewScheduler(l, s.buildTreeClock)
        s.repairTreeScheduler = c
-       err := c.Register("repair", cron.Descriptor, cronExp, func(t time.Time, 
_ *logger.Logger) bool {
-               s.doRepairScheduler(t, true)
+       err := c.Register("build-tree", cron.Descriptor, buildTreeCronExp, 
func(t time.Time, _ *logger.Logger) bool {
+               s.doBuildTreeScheduler(t, true)
                return true
        })
        if err != nil {
                return nil, fmt.Errorf("failed to add repair build tree cron 
task: %w", err)
        }
+       err = c.Register("trigger", 
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
+               triggerCronExp, func(time.Time, *logger.Logger) bool {
+                       gossipErr := s.doRepairGossip(s.closer.Ctx())
+                       if gossipErr != nil {
+                               s.l.Err(fmt.Errorf("repair gossip failure: %w", 
gossipErr))
+                       }
+                       return true
+               })
+       if err != nil {
+               return nil, fmt.Errorf("failed to add repair trigger cron task: 
%w", err)
+       }
        err = s.initializeInterval()
        if err != nil {
                return nil, err
@@ -872,18 +998,18 @@ func newRepairScheduler(
        return s, nil
 }
 
-func (r *repairScheduler) doRepairScheduler(t time.Time, triggerByCron bool) {
-       if !r.verifyShouldExecute(t, triggerByCron) {
+func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron 
bool) {
+       if !r.verifyShouldExecuteBuildTree(t, triggerByCron) {
                return
        }
 
-       err := r.doRepair()
+       err := r.doBuildTree()
        if err != nil {
                r.l.Err(fmt.Errorf("repair build status failure: %w", err))
        }
 }
 
-func (r *repairScheduler) verifyShouldExecute(t time.Time, triggerByCron bool) 
bool {
+func (r *repairScheduler) verifyShouldExecuteBuildTree(t time.Time, 
triggerByCron bool) bool {
        r.lastBuildTimeLocker.Lock()
        defer r.lastBuildTimeLocker.Unlock()
        if !triggerByCron {
@@ -900,7 +1026,7 @@ func (r *repairScheduler) verifyShouldExecute(t time.Time, 
triggerByCron bool) b
 func (r *repairScheduler) initializeInterval() error {
        r.lastBuildTimeLocker.Lock()
        defer r.lastBuildTimeLocker.Unlock()
-       interval, nextTime, exist := r.repairTreeScheduler.Interval("repair")
+       interval, nextTime, exist := 
r.repairTreeScheduler.Interval("build-tree")
        if !exist {
                return fmt.Errorf("failed to get repair build tree cron task 
interval")
        }
@@ -910,7 +1036,7 @@ func (r *repairScheduler) initializeInterval() error {
 }
 
 //nolint:contextcheck
-func (r *repairScheduler) doRepair() (err error) {
+func (r *repairScheduler) doBuildTree() (err error) {
        now := time.Now()
        r.metrics.totalRepairBuildTreeStarted.Inc(1)
        defer func() {
@@ -920,11 +1046,6 @@ func (r *repairScheduler) doRepair() (err error) {
                        r.metrics.totalRepairBuildTreeFailures.Inc(1)
                }
        }()
-       if !r.buildTreeLocker.TryLock() {
-               return nil
-       }
-       defer r.buildTreeLocker.Unlock()
-       // check each shard have any updates
        sLst := r.db.sLst.Load()
        if sLst == nil {
                return nil
@@ -945,6 +1066,22 @@ func (r *repairScheduler) doRepair() (err error) {
        }
 
        // otherwise, we need to build the trees
+       return r.buildingTree(nil, "", false)
+}
+
+// nolint: contextcheck
+func (r *repairScheduler) buildingTree(shards []common.ShardID, group string, 
force bool) error {
+       if force {
+               r.treeLocker.Lock()
+       } else if !r.treeLocker.TryLock() {
+               // if not forced, we try to lock the build tree locker
+               r.metrics.totalRepairBuildTreeConflicts.Inc(1)
+               return nil
+       }
+       defer r.treeLocker.Unlock()
+
+       buildAll := len(shards) == 0
+
        // take a new snapshot first
        snapshotPath, err := r.buildSnapshotFunc(r.closer.Ctx())
        if err != nil {
@@ -955,11 +1092,24 @@ func (r *repairScheduler) doRepair() (err error) {
                if err != nil {
                        return err
                }
+               if !buildAll {
+                       // if not building all shards, check if the shard is in 
the list
+                       found := false
+                       for _, s := range shards {
+                               if s == common.ShardID(id) {
+                                       found = true
+                                       break
+                               }
+                       }
+                       if !found {
+                               return nil // skip this shard
+                       }
+               }
                s, err := r.db.loadShard(r.closer.Ctx(), common.ShardID(id))
                if err != nil {
                        return fmt.Errorf("loading shard %d failure: %w", id, 
err)
                }
-               err = s.repairState.buildStatus(r.closer.Ctx(), 
path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix)))
+               err = s.repairState.buildStatus(r.closer.Ctx(), 
path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix)), group)
                if err != nil {
                        return fmt.Errorf("building status for shard %d 
failure: %w", id, err)
                }
@@ -977,7 +1127,7 @@ func (r *repairScheduler) documentUpdatesNotify() {
                case <-r.closer.CloseNotify():
                        return
                case <-time.After(r.quickBuildTreeTime):
-                       r.doRepairScheduler(r.buildTreeClock.Now(), false)
+                       r.doBuildTreeScheduler(r.buildTreeClock.Now(), false)
                        // reset the notified flag to allow the next 
notification
                        atomic.StoreInt32(r.quickRepairNotified, 0)
                }
@@ -990,18 +1140,81 @@ func (r *repairScheduler) close() {
        r.closer.CloseThenWait()
 }
 
+func (r *repairScheduler) doRepairGossip(ctx context.Context) error {
+       group, shardNum, err := r.randomSelectGroup(ctx)
+       if err != nil {
+               return fmt.Errorf("selecting random group failure: %w", err)
+       }
+
+       nodes, err := r.gossipMessenger.LocateNodes(group.Metadata.Name, 
shardNum, uint32(r.copiesCount(group)))
+       if err != nil {
+               return fmt.Errorf("locating nodes for group %s, shard %d 
failure: %w", group.Metadata.Name, shardNum, err)
+       }
+       return r.gossipMessenger.Propagation(nodes, group.Metadata.Name, 
shardNum)
+}
+
+func (r *repairScheduler) randomSelectGroup(ctx context.Context) 
(*commonv1.Group, uint32, error) {
+       allGroups, err := r.db.metadata.GroupRegistry().ListGroup(ctx)
+       if err != nil {
+               return nil, 0, fmt.Errorf("listing groups failure: %w", err)
+       }
+
+       groups := make([]*commonv1.Group, 0)
+       for _, group := range allGroups {
+               if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
+                       continue
+               }
+               // if the group don't have copies, skip it
+               if r.copiesCount(group) < 2 {
+                       continue
+               }
+               groups = append(groups, group)
+       }
+
+       if len(groups) == 0 {
+               return nil, 0, fmt.Errorf("no groups found with enough copies 
for repair")
+       }
+       // #nosec G404 -- not security-critical, just for random selection
+       group := groups[rand.Int64()%int64(len(groups))]
+       // #nosec G404 -- not security-critical, just for random selection
+       return group, rand.Uint32() % group.ResourceOpts.ShardNum, nil
+}
+
+func (r *repairScheduler) copiesCount(group *commonv1.Group) int {
+       return int(group.ResourceOpts.Replicas + 1)
+}
+
+func (r *repairScheduler) registerServerToGossip() func(server 
*grpclib.Server) {
+       s := newRepairGossipServer(r)
+       return func(server *grpclib.Server) {
+               propertyv1.RegisterRepairServiceServer(server, s)
+       }
+}
+
+func (r *repairScheduler) registerClientToGossip(messenger gossip.Messenger) {
+       messenger.Subscribe(newRepairGossipClient(r))
+}
+
 type repairSchedulerMetrics struct {
-       totalRepairBuildTreeStarted  meter.Counter
-       totalRepairBuildTreeFinished meter.Counter
-       totalRepairBuildTreeFailures meter.Counter
-       totalRepairBuildTreeLatency  meter.Counter
+       totalRepairBuildTreeStarted   meter.Counter
+       totalRepairBuildTreeFinished  meter.Counter
+       totalRepairBuildTreeFailures  meter.Counter
+       totalRepairBuildTreeLatency   meter.Counter
+       totalRepairBuildTreeConflicts meter.Counter
+
+       totalRepairSuccessCount meter.Counter
+       totalRepairFailedCount  meter.Counter
 }
 
 func newRepairSchedulerMetrics(omr *observability.Factory) 
*repairSchedulerMetrics {
        return &repairSchedulerMetrics{
-               totalRepairBuildTreeStarted:  
omr.NewCounter("repair_build_tree_started"),
-               totalRepairBuildTreeFinished: 
omr.NewCounter("repair_build_tree_finished"),
-               totalRepairBuildTreeFailures: 
omr.NewCounter("repair_build_tree_failures"),
-               totalRepairBuildTreeLatency:  
omr.NewCounter("repair_build_tree_latency"),
+               totalRepairBuildTreeStarted:   
omr.NewCounter("repair_build_tree_started"),
+               totalRepairBuildTreeFinished:  
omr.NewCounter("repair_build_tree_finished"),
+               totalRepairBuildTreeFailures:  
omr.NewCounter("repair_build_tree_failures"),
+               totalRepairBuildTreeLatency:   
omr.NewCounter("repair_build_tree_latency"),
+               totalRepairBuildTreeConflicts: 
omr.NewCounter("repair_build_tree_conflicts"),
+
+               totalRepairSuccessCount: 
omr.NewCounter("property_repair_success_count", "group", "shard"),
+               totalRepairFailedCount:  
omr.NewCounter("property_repair_failure_count", "group", "shard"),
        }
 }
diff --git a/banyand/property/repair_gossip.go 
b/banyand/property/repair_gossip.go
new file mode 100644
index 00000000..91d11e4a
--- /dev/null
+++ b/banyand/property/repair_gossip.go
@@ -0,0 +1,891 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "strings"
+
+       "github.com/pkg/errors"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+var (
+       gossipMerkleTreeReadPageSize int64 = 10
+       gossipShardQueryDatabaseSize       = 100
+)
+
+type repairGossipBase struct {
+       scheduler *repairScheduler
+}
+
+func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
+       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
+       }
+       tree, err := s.repairState.treeReader(group)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
+       }
+       if tree == nil {
+               // if the tree is nil, but the state file exist, means the 
tree(group) is empty
+               stateExist, err := s.repairState.stateFileExist()
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               }
+               // if the tree is nil, it means the tree is no data
+               return &emptyRepairTreeReader{}, stateExist, nil
+       }
+       return tree, true, nil
+}
+
+func (b *repairGossipBase) sendTreeSummary(
+       reader repairTreeReader,
+       group string,
+       shardID uint32,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+) (root *repairTreeNode, rootMatches bool, err error) {
+       roots, err := reader.read(nil, 1, false)
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to read tree root: %w", 
err)
+       }
+       if len(roots) == 0 {
+               return nil, false, fmt.Errorf("tree root is empty for group 
%s", group)
+       }
+
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeRoot{
+                       TreeRoot: &propertyv1.TreeRoot{
+                               Group:   group,
+                               ShardId: shardID,
+                               RootSha: roots[0].shaValue,
+                       },
+               },
+       })
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to send tree root for 
group %s: %w", group, err)
+       }
+
+       recv, err := stream.Recv()
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to receive tree summary 
response for group %s: %w", group, err)
+       }
+       rootCompare, ok := recv.Data.(*propertyv1.RepairResponse_RootCompare)
+       if !ok {
+               return nil, false, fmt.Errorf("unexpected response type: %T, 
expected RootCompare", recv.Data)
+       }
+       if !rootCompare.RootCompare.TreeFound {
+               return nil, false, fmt.Errorf("server tree not found for group: 
%s", group)
+       }
+       if rootCompare.RootCompare.RootShaMatch {
+               return roots[0], true, nil
+       }
+
+       var slots []*repairTreeNode
+       for {
+               slots, err = reader.read(roots[0], 
gossipMerkleTreeReadPageSize, false)
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to read slots for 
group %s: %w", group, err)
+               }
+               if len(slots) == 0 {
+                       break
+               }
+
+               slotReq := &propertyv1.TreeSlots{
+                       SlotSha: make([]*propertyv1.TreeSlotSHA, 0, len(slots)),
+               }
+               for _, s := range slots {
+                       slotReq.SlotSha = append(slotReq.SlotSha, 
&propertyv1.TreeSlotSHA{
+                               Slot:  s.slotInx,
+                               Value: s.shaValue,
+                       })
+               }
+
+               err = stream.Send(&propertyv1.RepairRequest{
+                       Data: &propertyv1.RepairRequest_TreeSlots{
+                               TreeSlots: slotReq,
+                       },
+               })
+               if err != nil {
+                       return nil, false, fmt.Errorf("failed to send tree 
slots for group %s: %w", group, err)
+               }
+       }
+
+       // send the empty slots list means there are no more slots to send
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_TreeSlots{
+                       TreeSlots: &propertyv1.TreeSlots{
+                               SlotSha: []*propertyv1.TreeSlotSHA{},
+                       },
+               },
+       })
+       if err != nil {
+               return nil, false, fmt.Errorf("failed to send empty tree slots 
for group %s: %w", group, err)
+       }
+
+       return roots[0], false, nil
+}
+
+func (b *repairGossipBase) queryProperty(ctx context.Context, syncShard 
*shard, leafNodeEntity string) (*queryProperty, *propertyv1.Property, error) {
+       g, n, entity, err := 
syncShard.repairState.parseLeafNodeEntity(leafNodeEntity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse leaf node entity 
%s: %w", leafNodeEntity, err)
+       }
+       searchQuery, err := inverted.BuildPropertyQueryFromEntity(groupField, 
g, n, entityID, entity)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to build query from leaf 
node entity %s: %w", leafNodeEntity, err)
+       }
+       queriedProperties, err := syncShard.search(ctx, searchQuery, 
gossipShardQueryDatabaseSize)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to search properties for 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       var latestProperty *queryProperty
+       for _, queried := range queriedProperties {
+               if latestProperty == nil || queried.timestamp > 
latestProperty.timestamp {
+                       latestProperty = queried
+               }
+       }
+       if latestProperty == nil {
+               return nil, nil, nil
+       }
+       var p propertyv1.Property
+       err = protojson.Unmarshal(latestProperty.source, &p)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to unmarshal property from 
leaf node entity %s: %w", leafNodeEntity, err)
+       }
+       return latestProperty, &p, nil
+}
+
+type repairGossipClient struct {
+       repairGossipBase
+}
+
+func newRepairGossipClient(s *repairScheduler) *repairGossipClient {
+       return &repairGossipClient{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipClient) Rev(ctx context.Context, nextNode 
*grpclib.ClientConn, request *propertyv1.PropagationRequest) error {
+       client := propertyv1.NewRepairServiceClient(nextNode)
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err := 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(request.ShardId)}, 
request.Group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
rebuild tree for group %s, shard %d", request.Group, request.ShardId)
+                       }
+               }
+       }()
+       reader, found, err := r.getTreeReader(ctx, request.Group, 
request.ShardId)
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to get 
tree reader on client side: %v", err)
+       }
+       if !found {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "tree for group 
%s, shard %d not found on client side", request.Group, request.ShardId)
+       }
+       defer reader.close()
+
+       stream, err := client.Repair(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to create repair stream: %w", err)
+       }
+       defer func() {
+               _ = stream.CloseSend()
+       }()
+
+       // step 1: send merkle tree data
+       rootNode, rootMatch, err := r.sendTreeSummary(reader, request.Group, 
request.ShardId, stream)
+       if err != nil {
+               // if the tree summary cannot be built, we should abort the 
propagation
+               return errors.Wrapf(gossip.ErrAbortPropagation, "failed to 
query/send tree summary on client side: %v", err)
+       }
+       // if the root node matched, then ignore the repair
+       if rootMatch {
+               return nil
+       }
+
+       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
+       if err != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       }
+       firstTreeSummaryResp := true
+
+       leafReader := newRepairBufferLeafReader(reader)
+       var currentComparingClientNode *repairTreeNode
+       var notProcessingClientNode *repairTreeNode
+       for {
+               recvResp, err := stream.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return nil
+                       }
+                       return fmt.Errorf("failed to keep receive tree summary 
from server: %w", err)
+               }
+
+               switch resp := recvResp.Data.(type) {
+               case *propertyv1.RepairResponse_DifferTreeSummary:
+                       // step 2: check with the server for different leaf 
nodes
+                       // there no different nodes, we can skip repair
+                       if firstTreeSummaryResp && 
len(resp.DifferTreeSummary.Nodes) == 0 {
+                               return nil
+                       }
+                       r.handleDifferSummaryFromServer(ctx, stream, 
resp.DifferTreeSummary, reader, syncShard, rootNode, leafReader, 
&notProcessingClientNode, &currentComparingClientNode)
+                       firstTreeSummaryResp = false
+               case *propertyv1.RepairResponse_PropertySync:
+                       // step 3: keep receiving messages from the server
+                       // if the server still sending different nodes, we 
should keep reading them
+                       // if the server sends a PropertySync, we should repair 
the property and send the newer property back to the server if needed
+                       sync := resp.PropertySync
+                       updated, newer, err := syncShard.repair(ctx, sync.Id, 
sync.Property, sync.DeleteTime)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
repair property %s", sync.Id)
+                               
r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, 
fmt.Sprintf("%d", request.ShardId))
+                       }
+                       if updated {
+                               r.scheduler.l.Debug().Msgf("successfully 
repaired property %s on client side", sync.Id)
+                               
r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, 
fmt.Sprintf("%d", request.ShardId))
+                               hasPropertyUpdated = true
+                               continue
+                       }
+                       // if the property hasn't been updated, and the newer 
property is not nil,
+                       // which means the property is newer than the server 
side,
+                       if !updated && newer != nil {
+                               var p propertyv1.Property
+                               err = protojson.Unmarshal(newer.source, &p)
+                               if err != nil {
+                                       
r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal property from db by 
entity %s", newer.id)
+                                       continue
+                               }
+                               // send the newer property to the server
+                               err = stream.Send(&propertyv1.RepairRequest{
+                                       Data: 
&propertyv1.RepairRequest_PropertySync{
+                                               PropertySync: 
&propertyv1.PropertySync{
+                                                       Id:         newer.id,
+                                                       Property:   &p,
+                                                       DeleteTime: 
newer.deleteTime,
+                                               },
+                                       },
+                               })
+                               if err != nil {
+                                       
r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response 
to server, entity: %s", newer.id)
+                               }
+                       }
+               default:
+                       r.scheduler.l.Warn().Msgf("unexpected response type: 
%T, expected DifferTreeSummary or PropertySync", resp)
+               }
+       }
+}
+
+func (r *repairGossipClient) sendPropertyMissing(stream 
grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse], entity string) {
+       err := stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_PropertyMissing{
+                       PropertyMissing: &propertyv1.PropertyMissing{
+                               Entity: entity,
+                       },
+               },
+       })
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
missing response to client, entity: %s", entity)
+       }
+}
+
+func (r *repairGossipClient) handleDifferSummaryFromServer(
+       ctx context.Context,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       differTreeSummary *propertyv1.DifferTreeSummary,
+       reader repairTreeReader,
+       syncShard *shard,
+       rootNode *repairTreeNode,
+       bufSlotReader *repairBufferLeafReader,
+       notProcessingClientNode **repairTreeNode,
+       currentComparingClientNode **repairTreeNode,
+) {
+       // if their no more different nodes, means the client side could be 
send the no more property sync request to notify the server
+       if len(differTreeSummary.Nodes) == 0 {
+               // if the current comparing client nodes still not empty, means 
the client side has leaf nodes that are not processed yet
+               // then queried and sent property to server
+               if *currentComparingClientNode != nil {
+                       // reading all reduced properties from the client side, 
and send to the server
+                       r.readingReduceLeafAndSendProperties(ctx, syncShard, 
stream, bufSlotReader, *currentComparingClientNode)
+                       *currentComparingClientNode = nil
+               }
+               if *notProcessingClientNode != nil {
+                       // if there still have difference client node not 
processing, means the client has property but server don't have
+                       // then queried and sent property to server
+                       r.queryPropertyAndSendToServer(ctx, syncShard, 
(*notProcessingClientNode).entity, stream)
+               }
+               err := stream.Send(&propertyv1.RepairRequest{
+                       Data: &propertyv1.RepairRequest_NoMorePropertySync{
+                               NoMorePropertySync: 
&propertyv1.NoMorePropertySync{},
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to send no 
more property sync request to server")
+               }
+               return
+       }
+
+       // keep reading the tree summary until there are no more different nodes
+       for _, node := range differTreeSummary.Nodes {
+               select {
+               case <-ctx.Done():
+                       r.scheduler.l.Warn().Msgf("context done while handling 
differ summary from server")
+                       return
+               default:
+                       break
+               }
+               // if the repair node doesn't exist in the server side, then 
should send all the real property data to server
+               if !node.Exists {
+                       clientSlotNode, findError := 
r.findSlotNodeByRoot(reader, rootNode, node.SlotIndex)
+                       if findError != nil {
+                               
r.scheduler.l.Warn().Err(findError).Msgf("client slot %d not exist", 
node.SlotIndex)
+                               continue
+                       }
+                       // read the leaf nodes from the client side and send 
properties to the server
+                       r.readingReduceLeafAndSendProperties(ctx, syncShard, 
stream, bufSlotReader, clientSlotNode)
+                       continue
+               }
+
+               needsFindSlot := false
+               if *currentComparingClientNode != nil && 
(*currentComparingClientNode).slotInx != node.SlotIndex {
+                       // the comparing node has changed, checks the client 
side still has reduced properties or not
+                       // reading all reduced properties from the client side, 
and send to the server
+                       r.readingReduceLeafAndSendProperties(ctx, syncShard, 
stream, bufSlotReader, *currentComparingClientNode)
+                       needsFindSlot = true
+               } else if *currentComparingClientNode == nil {
+                       needsFindSlot = true
+               }
+
+               if needsFindSlot {
+                       clientSlotNode, findError := 
r.findSlotNodeByRoot(reader, rootNode, node.SlotIndex)
+                       // if slot not exists in client side, then the client 
should ask the server for the property data of leaf nodes
+                       if findError != nil {
+                               r.sendPropertyMissing(stream, node.Entity)
+                               continue
+                       }
+                       *currentComparingClientNode = clientSlotNode
+               }
+
+               r.readingClientNodeAndCompare(ctx, syncShard, node, 
bufSlotReader, *currentComparingClientNode, stream, notProcessingClientNode)
+       }
+}
+
+func (r *repairGossipClient) readingReduceLeafAndSendProperties(
+       ctx context.Context,
+       syncShard *shard,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       reader *repairBufferLeafReader, parent *repairTreeNode,
+) {
+       // read the leaf nodes from the client side
+       for {
+               leafNode, err := reader.next(parent)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to read leaf 
nodes from client side")
+                       break
+               }
+               if leafNode == nil {
+                       break
+               }
+               // reading the real property data from the leaf nodes and 
sending to the server
+               r.queryPropertyAndSendToServer(ctx, syncShard, leafNode.entity, 
stream)
+       }
+}
+
+func (r *repairGossipClient) readingClientNodeAndCompare(
+       ctx context.Context,
+       syncShard *shard,
+       serverNode *propertyv1.TreeLeafNode,
+       bufReader *repairBufferLeafReader,
+       clientSlotNode *repairTreeNode,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       notProcessingClientNode **repairTreeNode,
+) {
+       var clientLeafNode *repairTreeNode
+       // if the latest node is not nil, means the client side has a leaf node 
that is not processed yet
+       if *notProcessingClientNode != nil {
+               clientLeafNode = *notProcessingClientNode
+       } else {
+               node, err := bufReader.next(clientSlotNode)
+               if err != nil {
+                       r.sendPropertyMissing(stream, serverNode.Entity)
+                       return
+               }
+               clientLeafNode = node
+       }
+
+       // if the client current leaf node is nil, means the client side 
doesn't have the leaf node,
+       // we should send the property missing request to the server
+       if clientLeafNode == nil {
+               r.sendPropertyMissing(stream, serverNode.Entity)
+               return
+       }
+
+       // compare the entity of the server leaf node with the client leaf node
+       entityCompare := strings.Compare(serverNode.Entity, 
clientLeafNode.entity)
+       if entityCompare == 0 {
+               // if the entity is the same, check the sha value
+               if serverNode.Sha != clientLeafNode.shaValue {
+                       r.queryPropertyAndSendToServer(ctx, syncShard, 
serverNode.Entity, stream)
+               }
+               *notProcessingClientNode = nil
+               return
+       } else if entityCompare < 0 {
+               // if the entity of the server leaf node is less than the 
client leaf node,
+               // it means the server leaf node does not exist in the client 
leaf nodes
+               r.sendPropertyMissing(stream, serverNode.Entity)
+               // means the client node is still not processing, waiting for 
the server node to compare
+               *notProcessingClientNode = clientLeafNode
+               return
+       }
+       // otherwise, the entity of the server leaf node is greater than the 
client leaf node,
+       // it means the client leaf node does not exist in the server leaf 
nodes,
+
+       // we should query the property from the client side and send it to the 
server
+       r.queryPropertyAndSendToServer(ctx, syncShard, clientLeafNode.entity, 
stream)
+       // cleanup the unprocess node, and let the client side leaf nodes keep 
reading
+       *notProcessingClientNode = nil
+       // cycle to read the next leaf node from the client side to make sure 
they have synced to the same entity
+       r.readingClientNodeAndCompare(ctx, syncShard, serverNode, bufReader, 
clientSlotNode, stream, notProcessingClientNode)
+}
+
+func (r *repairGossipClient) queryPropertyAndSendToServer(
+       ctx context.Context,
+       syncShard *shard,
+       entity string,
+       stream grpclib.BidiStreamingClient[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+) {
+       // otherwise, we need to send the property to the server
+       property, p, err := r.queryProperty(ctx, syncShard, entity)
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to query property 
for leaf node entity %s", entity)
+               return
+       }
+       if property == nil {
+               return
+       }
+       // send the property to the server
+       err = stream.Send(&propertyv1.RepairRequest{
+               Data: &propertyv1.RepairRequest_PropertySync{
+                       PropertySync: &propertyv1.PropertySync{
+                               Id:         GetPropertyID(p),
+                               Property:   p,
+                               DeleteTime: property.deleteTime,
+                       },
+               },
+       })
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
sync request to server, entity: %s", entity)
+       }
+}
+
+func (r *repairGossipClient) findSlotNodeByRoot(reader repairTreeReader, root 
*repairTreeNode, index int32) (*repairTreeNode, error) {
+       firstRead := true
+       for {
+               slotNodes, err := reader.read(root, 
gossipMerkleTreeReadPageSize, firstRead)
+               if err != nil {
+                       return nil, err
+               }
+               if len(slotNodes) == 0 {
+                       return nil, fmt.Errorf("failed to find slot node by 
root: %d", index)
+               }
+               for _, s := range slotNodes {
+                       if s.slotInx == index {
+                               return s, nil
+                       }
+               }
+
+               firstRead = false
+       }
+}
+
+type repairGossipServer struct {
+       propertyv1.UnimplementedRepairServiceServer
+       repairGossipBase
+}
+
+func newRepairGossipServer(s *repairScheduler) *repairGossipServer {
+       return &repairGossipServer{
+               repairGossipBase: repairGossipBase{
+                       scheduler: s,
+               },
+       }
+}
+
+func (r *repairGossipServer) Repair(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]) error {
+       summary, reader, err := r.combineTreeSummary(s)
+       if err != nil {
+               return fmt.Errorf("failed to receive tree summary request: %w", 
err)
+       }
+       defer reader.close()
+       // if no need to compare the tree, we can skip the rest of the process
+       if summary.ignoreCompare {
+               r.scheduler.l.Debug().Msgf("tree root for group %s, shard %d is 
the same, skip tree slots", summary.group, summary.shardID)
+               return nil
+       }
+       group := summary.group
+       shardID := summary.shardID
+       var hasPropertyUpdated bool
+       defer func() {
+               if hasPropertyUpdated {
+                       err = 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(shardID)}, group, true)
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
build tree for group %s", group)
+                       }
+               }
+       }()
+
+       serverSlotNodes, err := reader.read(summary.rootNode, 
int64(r.scheduler.treeSlotCount), false)
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to read slot nodes 
on server side")
+               return r.sendEmptyDiffer(s)
+       }
+       // client missing slots or server slots with different SHA values
+       clientMismatchSlots := make([]*repairTreeNode, 0)
+       // server missing slots
+       serverMissingSlots := make([]int32, 0)
+       clientSlotMap := make(map[int32]string, len(summary.slots))
+       for _, clientSlot := range summary.slots {
+               clientSlotMap[clientSlot.index] = clientSlot.shaValue
+       }
+       serverSlotSet := make(map[int32]bool, len(serverSlotNodes))
+       for _, serverSlot := range serverSlotNodes {
+               serverSlotSet[serverSlot.slotInx] = true
+               // if the client slot exists but the SHA value is different, or 
client slot does not exist
+               // then we should add it to the client mismatch slots
+               if clientSha, ok := clientSlotMap[serverSlot.slotInx]; ok && 
clientSha != serverSlot.shaValue {
+                       clientMismatchSlots = append(clientMismatchSlots, 
serverSlot)
+               } else if !ok {
+                       clientMismatchSlots = append(clientMismatchSlots, 
serverSlot)
+               }
+       }
+       // if the client slot exists but the server slot does not exist, we 
should add it to the server missing slots
+       for _, clientSlot := range summary.slots {
+               if _, ok := serverSlotSet[clientSlot.index]; !ok {
+                       serverMissingSlots = append(serverMissingSlots, 
clientSlot.index)
+               }
+       }
+       sent, err := r.sendDifferSlots(reader, clientMismatchSlots, 
serverMissingSlots, s)
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send different 
slots to client")
+       }
+       // send the tree and no more different slots needs to be sent
+       err = r.sendEmptyDiffer(s)
+       if !sent {
+               return err
+       } else if err != nil {
+               // should keep the message receiving loop
+               r.scheduler.l.Warn().Msgf("sent no difference slot to client 
failure, error: %v", err)
+       }
+       for {
+               missingOrSyncRequest, err := s.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return nil
+                       }
+                       return fmt.Errorf("failed to receive missing or sync 
request: %w", err)
+               }
+               syncShard, err := r.scheduler.db.loadShard(s.Context(), 
common.ShardID(shardID))
+               if err != nil {
+                       return fmt.Errorf("shard %d load failure on server 
side: %w", shardID, err)
+               }
+               switch req := missingOrSyncRequest.Data.(type) {
+               case *propertyv1.RepairRequest_PropertyMissing:
+                       r.processPropertyMissing(s.Context(), syncShard, 
req.PropertyMissing, s)
+               case *propertyv1.RepairRequest_PropertySync:
+                       if r.processPropertySync(s.Context(), syncShard, 
req.PropertySync, s, group) {
+                               hasPropertyUpdated = true
+                       }
+               case *propertyv1.RepairRequest_NoMorePropertySync:
+                       // if the client has no more property sync, the server 
side should stop the sync
+                       return nil
+               }
+       }
+}
+
+func (r *repairGossipServer) combineTreeSummary(
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+) (*repairTreeSummary, repairTreeReader, error) {
+       var summary *repairTreeSummary
+       var reader repairTreeReader
+       for {
+               recvData, err := s.Recv()
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to receive 
tree summary from client")
+                       return nil, nil, err
+               }
+
+               switch data := recvData.Data.(type) {
+               case *propertyv1.RepairRequest_TreeRoot:
+                       summary, reader, err = r.handleTreeRootRequest(s, data)
+                       if err != nil {
+                               if sendError := r.sendRootCompare(s, false, 
false); sendError != nil {
+                                       
r.scheduler.l.Warn().Err(sendError).Msgf("failed to send root compare response 
to client")
+                               }
+                               return nil, nil, err
+                       }
+                       if err = r.sendRootCompare(s, true, 
summary.ignoreCompare); err != nil {
+                               _ = reader.close()
+                               return nil, nil, err
+                       }
+                       if summary.ignoreCompare {
+                               return summary, reader, nil
+                       }
+               case *propertyv1.RepairRequest_TreeSlots:
+                       if len(data.TreeSlots.SlotSha) == 0 {
+                               return summary, reader, nil
+                       }
+                       for _, slot := range data.TreeSlots.SlotSha {
+                               summary.slots = append(summary.slots, 
&repairTreeSummarySlot{
+                                       index:    slot.Slot,
+                                       shaValue: slot.Value,
+                               })
+                       }
+               default:
+                       r.scheduler.l.Warn().Msgf("unexpected data type: %T, 
expected TreeRoot or TreeSlots", data)
+               }
+       }
+}
+
+func (r *repairGossipServer) handleTreeRootRequest(
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       req *propertyv1.RepairRequest_TreeRoot,
+) (*repairTreeSummary, repairTreeReader, error) {
+       summary := &repairTreeSummary{
+               group:   req.TreeRoot.Group,
+               shardID: req.TreeRoot.ShardId,
+       }
+
+       reader, exist, err := r.getTreeReader(s.Context(), summary.group, 
summary.shardID)
+       if err != nil || !exist {
+               return nil, nil, fmt.Errorf("tree not found or not exist: %w", 
err)
+       }
+       rootNode, err := reader.read(nil, 1, false)
+       if err != nil {
+               _ = reader.close()
+               return nil, nil, fmt.Errorf("failed to read tree root for group 
%s: %w", summary.group, err)
+       }
+       if len(rootNode) == 0 {
+               _ = reader.close()
+               return nil, nil, fmt.Errorf("failed to read tree root for group 
%s: %w", summary.group, err)
+       }
+       summary.rootNode = rootNode[0]
+       summary.ignoreCompare = req.TreeRoot.RootSha == rootNode[0].shaValue
+       return summary, reader, nil
+}
+
+func (r *repairGossipServer) sendRootCompare(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse], treeFound, rootMatch bool) error {
+       return s.Send(&propertyv1.RepairResponse{
+               Data: &propertyv1.RepairResponse_RootCompare{
+                       RootCompare: &propertyv1.RootCompare{
+                               TreeFound:    treeFound,
+                               RootShaMatch: rootMatch,
+                       },
+               },
+       })
+}
+
+func (r *repairGossipServer) processPropertySync(
+       ctx context.Context,
+       syncShard *shard,
+       sync *propertyv1.PropertySync,
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       group string,
+) bool {
+       updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, 
sync.DeleteTime)
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to repair property 
%s from server side", sync.Id)
+               r.scheduler.metrics.totalRepairFailedCount.Inc(1, group, 
fmt.Sprintf("%d", syncShard.id))
+               return false
+       }
+       if updated {
+               r.scheduler.l.Debug().Msgf("successfully repaired property %s 
on server side", sync.Id)
+               r.scheduler.metrics.totalRepairSuccessCount.Inc(1, group, 
fmt.Sprintf("%d", syncShard.id))
+       }
+       if !updated && newer != nil {
+               // if the property hasn't been updated, and the newer property 
is not nil,
+               // which means the property is newer than the client side,
+               var p propertyv1.Property
+               err = protojson.Unmarshal(newer.source, &p)
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to unmarshal 
property from db by entity %s", newer.id)
+                       return false
+               }
+               // send the newer property to the client
+               err = s.Send(&propertyv1.RepairResponse{
+                       Data: &propertyv1.RepairResponse_PropertySync{
+                               PropertySync: &propertyv1.PropertySync{
+                                       Id:         newer.id,
+                                       Property:   &p,
+                                       DeleteTime: newer.deleteTime,
+                               },
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to send 
newer property sync response to client, entity: %s", newer.id)
+                       return false
+               }
+       }
+       return updated
+}
+
+func (r *repairGossipServer) processPropertyMissing(
+       ctx context.Context,
+       syncShard *shard,
+       missing *propertyv1.PropertyMissing,
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+) {
+       property, data, err := r.queryProperty(ctx, syncShard, missing.Entity)
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to query client 
missing property from server side: %s", missing.Entity)
+               return
+       }
+       if property == nil {
+               return
+       }
+       err = s.Send(&propertyv1.RepairResponse{
+               Data: &propertyv1.RepairResponse_PropertySync{
+                       PropertySync: &propertyv1.PropertySync{
+                               Id:         property.id,
+                               Property:   data,
+                               DeleteTime: property.deleteTime,
+                       },
+               },
+       })
+       if err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
sync response to client, entity: %s", missing.Entity)
+               return
+       }
+}
+
+func (r *repairGossipServer) sendDifferSlots(
+       reader repairTreeReader,
+       clientMismatchSlots []*repairTreeNode,
+       serverMissingSlots []int32,
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+) (hasSent bool, err error) {
+       var leafNodes []*repairTreeNode
+
+       // send server mismatch slots to the client
+       for _, node := range clientMismatchSlots {
+               for {
+                       leafNodes, err = reader.read(node, 
gossipMerkleTreeReadPageSize, false)
+                       if err != nil {
+                               return hasSent, fmt.Errorf("failed to read leaf 
nodes for slot %d: %w", node.slotInx, err)
+                       }
+                       // if there are no more leaf nodes, we can skip this 
slot
+                       if len(leafNodes) == 0 {
+                               break
+                       }
+                       mismatchLeafNodes := make([]*propertyv1.TreeLeafNode, 
0, len(leafNodes))
+                       for _, leafNode := range leafNodes {
+                               mismatchLeafNodes = append(mismatchLeafNodes, 
&propertyv1.TreeLeafNode{
+                                       SlotIndex: node.slotInx,
+                                       Exists:    true,
+                                       Entity:    leafNode.entity,
+                                       Sha:       leafNode.shaValue,
+                               })
+                       }
+                       // send the leaf nodes to the client
+                       err = s.Send(&propertyv1.RepairResponse{
+                               Data: 
&propertyv1.RepairResponse_DifferTreeSummary{
+                                       DifferTreeSummary: 
&propertyv1.DifferTreeSummary{
+                                               Nodes: mismatchLeafNodes,
+                                       },
+                               },
+                       })
+                       if err != nil {
+                               r.scheduler.l.Warn().Err(err).
+                                       Msgf("failed to send leaf nodes for 
slot %d", node.slotInx)
+                       } else {
+                               hasSent = true
+                       }
+               }
+       }
+
+       // send server missing slots to the client
+       missingSlots := make([]*propertyv1.TreeLeafNode, 0, 
len(serverMissingSlots))
+       for _, missingSlot := range serverMissingSlots {
+               missingSlots = append(missingSlots, &propertyv1.TreeLeafNode{
+                       SlotIndex: missingSlot,
+                       Exists:    false,
+               })
+       }
+       if len(missingSlots) > 0 {
+               // send the missing slots to the client
+               err = s.Send(&propertyv1.RepairResponse{
+                       Data: &propertyv1.RepairResponse_DifferTreeSummary{
+                               DifferTreeSummary: 
&propertyv1.DifferTreeSummary{
+                                       Nodes: missingSlots,
+                               },
+                       },
+               })
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).
+                               Msgf("failed to send missing slots")
+               } else {
+                       hasSent = true
+               }
+       }
+       return hasSent, nil
+}
+
+func (r *repairGossipServer) sendEmptyDiffer(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]) error {
+       return s.Send(&propertyv1.RepairResponse{
+               Data: &propertyv1.RepairResponse_DifferTreeSummary{
+                       DifferTreeSummary: &propertyv1.DifferTreeSummary{},
+               },
+       })
+}
+
+type repairTreeSummary struct {
+       rootNode      *repairTreeNode
+       group         string
+       slots         []*repairTreeSummarySlot
+       shardID       uint32
+       ignoreCompare bool
+}
+
+type repairTreeSummarySlot struct {
+       shaValue string
+       index    int32
+}
+
+type emptyRepairTreeReader struct{}
+
+func (e *emptyRepairTreeReader) read(n *repairTreeNode, _ int64, _ bool) 
([]*repairTreeNode, error) {
+       if n == nil {
+               return []*repairTreeNode{{tp: repairTreeNodeTypeRoot}}, nil
+       }
+       return nil, nil
+}
+
+func (e *emptyRepairTreeReader) close() error {
+       return nil
+}
diff --git a/banyand/property/repair_gossip_test.go 
b/banyand/property/repair_gossip_test.go
new file mode 100644
index 00000000..727d85fb
--- /dev/null
+++ b/banyand/property/repair_gossip_test.go
@@ -0,0 +1,581 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "path"
+       "path/filepath"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "go.uber.org/mock/gomock"
+       "go.uber.org/multierr"
+       "google.golang.org/grpc"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/banyand/property/gossip"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+const (
+       testGroup1 = "test-group1"
+)
+
+func TestPropertyRepairGossip(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Property Repair Gossip Suite")
+}
+
+var _ = ginkgo.Describe("Property repair gossip", func() {
+       gomega.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(gomega.Succeed())
+
+       var ctrl *gomock.Controller
+       var nodes []*nodeContext
+       ginkgo.BeforeEach(func() {
+               ctrl = gomock.NewController(ginkgo.GinkgoT())
+               gomega.Expect(ctrl).NotTo(gomega.BeNil(), "gomock controller 
should not be nil")
+       })
+
+       ginkgo.AfterEach(func() {
+               for _, node := range nodes {
+                       if node != nil {
+                               node.stopAll()
+                       }
+               }
+               nodes = nil
+       })
+
+       ginkgo.It("all repair tree not built", func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // all nodes should keep their existing 
properties
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("all repair tree with client built", func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // all nodes should keep their existing 
properties
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip two data nodes with client version < server version", 
func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // the first node property should be updated to 
the version 2
+                               original[0].properties[0].version = 2
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip two data nodes with client version < server version", 
func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // the first node property should be updated to 
version 2
+                               original[1].properties[0].version = 2
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip two data nodes with client version = server version", 
func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // keep the properties as is, since the 
versions are equal
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip two data nodes with client missing but server exist", 
func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: nil, shardCount: 2, treeBuilt: 
true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, shardCount: 2, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // the first node should get the property from 
the second node
+                               original[0].properties = original[1].properties
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip two data nodes with client exist but server missing", 
func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, shardCount: 2, treeBuilt: true},
+                               {properties: nil, shardCount: 2, treeBuilt: 
true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // the second node should get the property from 
the first node
+                               original[1].properties = original[0].properties
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip only repair one shard", func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, shardCount: 2, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 1, id: "2", version: 2}}, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID}, 
testGroup1, 1)
+                       },
+                       result: func(original []node) []node {
+                               // the first node should sync the property from 
the second node
+                               original[0].properties = 
append(original[0].properties, original[1].properties[0])
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip with three nodes", func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 1},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 2}}, treeBuilt: true},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 3}}, treeBuilt: true},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID, 
nodes[2].nodeID}, testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               original[0].properties[0] = 
original[2].properties[0]
+                               original[1].properties[0] = 
original[2].properties[0]
+                               return original
+                       },
+               })
+       })
+
+       ginkgo.It("gossip with three nodes and only one slot", func() {
+               startingEachTest(&nodes, ctrl, &testCase{
+                       groups: []group{
+                               {name: testGroup1, shardCount: 2, 
replicasCount: 3},
+                       },
+                       nodes: []node{
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "2", version: 1}}, treeBuilt: true, treeSlotCount: 1},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "1", version: 1}}, treeBuilt: true, treeSlotCount: 1},
+                               {properties: []property{{group: testGroup1, 
shard: 0, id: "3", version: 1}}, treeBuilt: true, treeSlotCount: 1},
+                       },
+                       propagation: func(nodes []*nodeContext) error {
+                               return 
nodes[0].messenger.Propagation([]string{nodes[0].nodeID, nodes[1].nodeID, 
nodes[2].nodeID}, testGroup1, 0)
+                       },
+                       result: func(original []node) []node {
+                               // should all nodes have all properties
+                               original[0].properties = 
append(original[0].properties, original[1].properties[0])
+                               original[0].properties = 
append(original[0].properties, original[2].properties[0])
+
+                               original[1].properties = 
append(original[1].properties, original[0].properties[0])
+                               original[1].properties = 
append(original[1].properties, original[2].properties[0])
+
+                               original[2].properties = 
append(original[2].properties, original[0].properties[0])
+                               original[2].properties = 
append(original[2].properties, original[1].properties[0])
+                               return original
+                       },
+               })
+       })
+})
+
+type testCase struct {
+       propagation func(nodes []*nodeContext) error
+       result      func(original []node) []node
+       groups      []group
+       nodes       []node
+}
+
+func startingEachTest(nodes *[]*nodeContext, ctrl *gomock.Controller, c 
*testCase) {
+       *nodes = startDataNodes(ctrl, c.nodes, c.groups)
+
+       // adding the wait group the node context, to make sure the gossip 
server is synced at least once
+       once := sync.Once{}
+       leastOnceChannel := make(chan struct{})
+       for _, n := range *nodes {
+               n.clientWrapper.once = &once
+               n.clientWrapper.c = leastOnceChannel
+       }
+       err := c.propagation(*nodes)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       // make sure the gossip server is synced at least once
+       gomega.Eventually(leastOnceChannel, 
flags.EventuallyTimeout).Should(gomega.BeClosed())
+
+       original := nodeContextToParentSlice(*nodes)
+       updatedResult := c.result(original)
+       for inx, r := range updatedResult {
+               relatedCtx := (*nodes)[inx]
+               for _, updatedProperty := range r.properties {
+                       queryPropertyWithVerify(relatedCtx.database, 
updatedProperty)
+               }
+       }
+}
+
+func startDataNodes(ctrl *gomock.Controller, nodes []node, groups []group) 
[]*nodeContext {
+       result := make([]*nodeContext, 0, len(nodes))
+       for _, n := range nodes {
+               result = append(result, startEachNode(ctrl, n, groups))
+       }
+
+       // registering the node in the gossip system
+       for _, m := range result {
+               for _, n := range result {
+                       
m.messenger.(schema.EventHandler).OnAddOrUpdate(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Name: n.nodeID,
+                                       Kind: schema.KindNode,
+                               },
+                               Spec: &databasev1.Node{
+                                       Metadata: &commonv1.Metadata{
+                                               Name: n.nodeID,
+                                       },
+                                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                                       GrpcAddress: n.nodeID,
+
+                                       PropertyRepairGossipGrpcAddress: 
n.nodeID,
+                               },
+                       })
+               }
+       }
+       return result
+}
+
+func startEachNode(ctrl *gomock.Controller, node node, groups []group) 
*nodeContext {
+       if node.treeSlotCount == 0 {
+               node.treeSlotCount = 32 // default value for tree slot count
+       }
+       result := &nodeContext{node: node}
+       dbLocation, dbLocationDefer, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       result.appendStop(dbLocationDefer)
+       repairLocation, repairLocationDefer, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       result.appendStop(repairLocationDefer)
+       mockGroup := schema.NewMockGroup(ctrl)
+       groupDefines := make([]*commonv1.Group, 0, len(groups))
+       for _, g := range groups {
+               groupDefines = append(groupDefines, &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Group: g.name,
+                               Name:  g.name,
+                       },
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: uint32(g.shardCount),
+                               Replicas: uint32(g.replicasCount),
+                       },
+               })
+       }
+       mockGroup.EXPECT().ListGroup(gomock.Any()).Return(groupDefines, 
nil).AnyTimes()
+
+       mockRepo := metadata.NewMockRepo(ctrl)
+       mockRepo.EXPECT().RegisterHandler("", schema.KindGroup, 
gomock.Any()).MaxTimes(1)
+       mockRepo.EXPECT().GroupRegistry().Return(mockGroup).AnyTimes()
+
+       ports, err := test.AllocateFreePorts(1)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       messenger := 
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), ports[0])
+       addr := fmt.Sprintf("127.0.0.1:%d", ports[0])
+       result.nodeID = addr
+       err = messenger.Validate()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       
messenger.(run.PreRunner).PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
+               NodeID:                    addr,
+               PropertyGossipGrpcAddress: addr,
+       }))
+
+       ctx := context.WithValue(context.Background(), common.ContextNodeKey, 
common.Node{
+               NodeID: addr,
+       })
+       var db *database
+       db, err = openDB(ctx,
+               dbLocation, time.Minute*10, time.Minute*10, 
int(node.treeSlotCount), observability.NewBypassRegistry(),
+               fs.NewLocalFileSystem(), true, repairLocation, "@every 10m", 
time.Minute*10, "* 2 * * *",
+               messenger, mockRepo, func(context.Context) (string, error) {
+                       snapshotDir, defFunc, newSpaceErr := test.NewSpace()
+                       if newSpaceErr != nil {
+                               return "", newSpaceErr
+                       }
+                       result.appendStop(defFunc)
+                       sList := db.sLst.Load()
+                       var snpError error
+                       for _, s := range *sList {
+                               snpDir := path.Join(snapshotDir, 
filepath.Base(s.location))
+                               lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
+                               if e := s.store.TakeFileSnapshot(snpDir); e != 
nil {
+                                       snpError = multierr.Append(snpError, e)
+                               }
+                       }
+                       return snapshotDir, snpError
+               })
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       result.database = db
+
+       // wrap the server and client in gossip messenger to for getting the 
sync status
+       
messenger.RegisterServices(result.database.repairScheduler.registerServerToGossip())
+       gossipClient := &repairGossipClientWrapper{repairGossipClient: 
newRepairGossipClient(result.database.repairScheduler)}
+       result.clientWrapper = gossipClient
+       messenger.Subscribe(gossipClient)
+       db.repairScheduler.registerClientToGossip(messenger)
+
+       messenger.Serve(make(chan struct{}))
+       result.messenger = messenger
+
+       // check gossip server is up
+       gomega.Eventually(func() error {
+               conn, connectErr := net.DialTimeout("tcp", addr, time.Second*2)
+               if connectErr == nil {
+                       _ = conn.Close()
+               }
+               return connectErr
+       }, flags.EventuallyTimeout).Should(gomega.Succeed())
+
+       result.appendStop(messenger.GracefulStop)
+
+       // initialize shard in to db
+       for i := int32(0); i < node.shardCount; i++ {
+               shardID := common.ShardID(i)
+               _, err = db.loadShard(context.Background(), shardID)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       }
+
+       // adding data to the node
+       for _, p := range node.properties {
+               applyPropertyUpdate(db, p)
+       }
+
+       if node.treeBuilt {
+               // building the gossip tree for the node
+               err = db.repairScheduler.buildingTree(nil, "", true)
+               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       }
+
+       return result
+}
+
+type repairGossipClientWrapper struct {
+       *repairGossipClient
+       c    chan struct{}
+       once *sync.Once
+}
+
+func (w *repairGossipClientWrapper) Rev(ctx context.Context, nextNode 
*grpc.ClientConn, request *propertyv1.PropagationRequest) error {
+       err := w.repairGossipClient.Rev(ctx, nextNode, request)
+       w.once.Do(func() {
+               close(w.c)
+       })
+       return err
+}
+
+func applyPropertyUpdate(db *database, p property) {
+       s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       update := &propertyv1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group:       p.group,
+                       Name:        "test-name",
+                       ModRevision: p.version,
+               },
+               Id: p.id,
+       }
+       if p.deleted {
+               err = s.delete(context.Background(), 
[][]byte{GetPropertyID(update)})
+       } else {
+               err = s.update(GetPropertyID(update), update)
+       }
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+func queryPropertyWithVerify(db *database, p property) {
+       s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+               Groups: []string{p.group},
+               Name:   "test-name",
+               Ids:    []string{p.id},
+       }, groupField, entityID)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       gomega.Eventually(func() *property {
+               dataList, err := s.search(context.Background(), query, 10)
+               if err != nil {
+                       return nil
+               }
+               var latestData *queryProperty
+               for _, data := range dataList {
+                       if latestData == nil || data.timestamp > 
latestData.timestamp {
+                               latestData = data
+                       }
+               }
+               if latestData == nil {
+                       return nil
+               }
+               return &property{group: p.group, shard: p.shard, id: p.id, 
version: latestData.timestamp, deleted: latestData.deleteTime > 0}
+       }, flags.EventuallyTimeout).Should(gomega.Equal(&p))
+}
+
+type group struct {
+       name          string
+       shardCount    int32
+       replicasCount int32
+}
+
+type node struct {
+       properties []property
+       treeBuilt  bool
+       // for the no data scenario, we need to specify the shard count
+       shardCount    int32
+       treeSlotCount int32
+}
+
+type property struct {
+       group   string
+       id      string
+       version int64
+       shard   int32
+       deleted bool
+}
+
+type nodeContext struct {
+       messenger     gossip.Messenger
+       database      *database
+       clientWrapper *repairGossipClientWrapper
+       nodeID        string
+       stop          []func()
+       node
+       stopMutex sync.RWMutex
+}
+
+func (n *nodeContext) appendStop(f func()) {
+       n.stopMutex.Lock()
+       defer n.stopMutex.Unlock()
+       n.stop = append(n.stop, f)
+}
+
+func (n *nodeContext) stopAll() {
+       n.stopMutex.RLock()
+       result := make([]func(), 0, len(n.stop))
+       result = append(result, n.stop...)
+       n.stopMutex.RUnlock()
+       for _, f := range result {
+               f()
+       }
+}
+
+func nodeContextToParentSlice(ncs []*nodeContext) []node {
+       nodes := make([]node, 0, len(ncs))
+       for _, nc := range ncs {
+               nodes = append(nodes, nc.node)
+       }
+       return nodes
+}
diff --git a/banyand/property/repair_test.go b/banyand/property/repair_test.go
index 172cd351..bc795b57 100644
--- a/banyand/property/repair_test.go
+++ b/banyand/property/repair_test.go
@@ -199,8 +199,8 @@ func TestBuildTree(t *testing.T) {
                        }
                        defers = append(defers, snapshotDeferFunc)
                        db, err := openDB(context.Background(), dataDir, 
3*time.Second, time.Hour, 32,
-                               observability.BypassRegistry, 
fs.NewLocalFileSystem(), snapshotDir,
-                               "@every 10m", time.Second*10, 
func(context.Context) (string, error) {
+                               observability.BypassRegistry, 
fs.NewLocalFileSystem(), true, snapshotDir,
+                               "@every 10m", time.Second*10, "* 2 * * *", nil, 
nil, func(context.Context) (string, error) {
                                        snapshotDir, defFunc, newSpaceErr := 
test.NewSpace()
                                        if newSpaceErr != nil {
                                                return "", newSpaceErr
@@ -231,7 +231,7 @@ func TestBuildTree(t *testing.T) {
                                }
                        }
 
-                       err = newShard.repairState.scheduler.doRepair()
+                       err = newShard.repairState.scheduler.doBuildTree()
                        if err != nil {
                                t.Fatalf("failed to build status: %v", err)
                        }
@@ -254,7 +254,7 @@ func TestBuildTree(t *testing.T) {
                        }
 
                        if tt.nextStatusVerify != nil {
-                               err := newShard.repairState.scheduler.doRepair()
+                               err := 
newShard.repairState.scheduler.doBuildTree()
                                if err != nil {
                                        t.Fatalf("failed to build status after 
update: %v", err)
                                }
@@ -289,8 +289,8 @@ func TestDocumentUpdatesNotify(t *testing.T) {
        }
        defers = append(defers, snapshotDeferFunc)
        db, err := openDB(context.Background(), dataDir, 3*time.Second, 
time.Hour, 32,
-               observability.BypassRegistry, fs.NewLocalFileSystem(), 
snapshotDir,
-               "@every 10m", time.Millisecond*50, func(context.Context) 
(string, error) {
+               observability.BypassRegistry, fs.NewLocalFileSystem(), true, 
snapshotDir,
+               "@every 10m", time.Millisecond*50, "* 2 * * *", nil, nil, 
func(context.Context) (string, error) {
                        tmpDir, defFunc, newSpaceErr := test.NewSpace()
                        if newSpaceErr != nil {
                                return "", newSpaceErr
@@ -523,7 +523,7 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
                _ = reader.close()
        }()
 
-       roots, err := reader.read(nil, 10)
+       roots, err := reader.read(nil, 10, false)
        if err != nil {
                t.Fatalf("failed to read tree for group %s: %v", group, err)
        }
@@ -536,7 +536,7 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
                        shaValue: roots[0].shaValue,
                },
        }
-       slots, err := reader.read(roots[0], 10)
+       slots, err := reader.read(roots[0], 10, false)
        if err != nil {
                t.Fatalf("failed to read slots for group %s: %v", group, err)
        }
@@ -545,20 +545,20 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
        }
        for _, slot := range slots {
                slotNode := &repairTestTreeNode{
-                       id:       slot.id,
+                       id:       fmt.Sprintf("%d", slot.slotInx),
                        shaValue: slot.shaValue,
                }
                tree.root.children = append(tree.root.children, slotNode)
-               children, err := reader.read(slot, 10)
+               children, err := reader.read(slot, 10, false)
                if err != nil {
-                       t.Fatalf("failed to read children for slot %s in group 
%s: %v", slot.id, group, err)
+                       t.Fatalf("failed to read children for slot %d in group 
%s: %v", slot.slotInx, group, err)
                }
                if len(children) == 0 {
-                       t.Fatalf("expected at least one child for slot %s in 
group %s, but got none", slot.id, group)
+                       t.Fatalf("expected at least one child for slot %d in 
group %s, but got none", slot.slotInx, group)
                }
                for _, child := range children {
                        childNode := &repairTestTreeNode{
-                               id:       child.id,
+                               id:       child.entity,
                                shaValue: child.shaValue,
                        }
                        slotNode.children = append(slotNode.children, childNode)
diff --git a/banyand/property/service.go b/banyand/property/service.go
index 2c99a523..bc5892e8 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -59,21 +59,23 @@ type service struct {
        gossipMessenger          gossip.Messenger
        omr                      observability.MetricsRegistry
        lfs                      fs.FileSystem
+       pm                       protector.Memory
        close                    chan struct{}
        db                       *database
        l                        *logger.Logger
-       pm                       protector.Memory
-       root                     string
        nodeID                   string
+       root                     string
        snapshotDir              string
        repairDir                string
        repairBuildTreeCron      string
+       repairTriggerCron        string
        flushTimeout             time.Duration
        expireTimeout            time.Duration
        repairQuickBuildTreeTime time.Duration
        repairTreeSlotCount      int
        maxDiskUsagePercent      int
        maxFileSnapshotNum       int
+       repairEnabled            bool
 }
 
 func (s *service) FlagSet() *run.FlagSet {
@@ -87,6 +89,8 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.repairBuildTreeCron, 
"property-repair-build-tree-cron", "@every 1h", "the cron expression for 
repairing the build tree")
        flagS.DurationVar(&s.repairQuickBuildTreeTime, 
"property-repair-quick-build-tree-time", time.Minute*10,
                "the duration of the quick build tree after operate the 
property")
+       flagS.StringVar(&s.repairTriggerCron, "property-repair-trigger-cron", 
"* 2 * * *", "the cron expression for background repairing the property data")
+       flagS.BoolVar(&s.repairEnabled, "property-repair-enabled", false, 
"whether to enable the background property repair")
        s.gossipMessenger.FlagSet().VisitAll(func(f *pflag.Flag) {
                flagS.AddFlag(f)
        })
@@ -134,31 +138,34 @@ func (s *service) PreRun(ctx context.Context) error {
        }
        node := val.(common.Node)
        s.nodeID = node.NodeID
-       // if the gossip address is empty, it means that the gossip is not 
enabled.
-       if node.PropertyGossipGrpcAddress == "" {
-               s.gossipMessenger = nil
-       }
 
        var err error
        snapshotLis := &snapshotListener{s: s}
        s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), 
s.flushTimeout, s.expireTimeout, s.repairTreeSlotCount, s.omr, s.lfs,
-               s.repairDir, s.repairBuildTreeCron, s.repairQuickBuildTreeTime, 
func(ctx context.Context) (string, error) {
+               s.repairEnabled, s.repairDir, s.repairBuildTreeCron, 
s.repairQuickBuildTreeTime, s.repairTriggerCron, s.gossipMessenger, s.metadata,
+               func(ctx context.Context) (string, error) {
                        res := snapshotLis.Rev(ctx,
                                
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
[]*databasev1.SnapshotRequest_Group{}))
                        snpMsg := res.Data().(*databasev1.Snapshot)
                        if snpMsg.Error != "" {
                                return "", errors.New(snpMsg.Error)
                        }
-                       return snpMsg.Name, nil
+                       return filepath.Join(snapshotLis.s.snapshotDir, 
snpMsg.Name, storage.DataDir), nil
                })
        if err != nil {
                return err
        }
 
+       // if the gossip address is empty or repair scheduler is not start, it 
means that the gossip is not enabled.
+       if node.PropertyGossipGrpcAddress == "" || s.db.repairScheduler == nil {
+               s.gossipMessenger = nil
+       }
        if s.gossipMessenger != nil {
                if err = s.gossipMessenger.PreRun(ctx); err != nil {
                        return err
                }
+               
s.gossipMessenger.RegisterServices(s.db.repairScheduler.registerServerToGossip())
+               s.db.repairScheduler.registerClientToGossip(s.gossipMessenger)
        }
        return multierr.Combine(
                s.pipeline.Subscribe(data.TopicPropertyUpdate, 
&updateListener{s: s, path: path, maxDiskUsagePercent: s.maxDiskUsagePercent}),
diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index ab9059fa..9d5895fe 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -241,7 +241,9 @@ func (s *shard) updateDocuments(docs index.Documents) error 
{
        if persistentError != nil {
                return fmt.Errorf("persistent failure: %w", persistentError)
        }
-       s.repairState.scheduler.documentUpdatesNotify()
+       if s.repairState.scheduler != nil {
+               s.repairState.scheduler.documentUpdatesNotify()
+       }
        return nil
 }
 
@@ -286,18 +288,18 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
        return data, nil
 }
 
-func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) error {
+func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) (updated bool, selfNewer 
*queryProperty, err error) {
        iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
                Groups: []string{property.Metadata.Group},
                Name:   property.Metadata.Name,
                Ids:    []string{property.Id},
        }, groupField, entityID)
        if err != nil {
-               return fmt.Errorf("build property query failure: %w", err)
+               return false, nil, fmt.Errorf("build property query failure: 
%w", err)
        }
        olderProperties, err := s.search(ctx, iq, 100)
        if err != nil {
-               return fmt.Errorf("query older properties failed: %w", err)
+               return false, nil, fmt.Errorf("query older properties failed: 
%w", err)
        }
        sort.Sort(queryPropertySlice(olderProperties))
        // if there no older properties, we can insert the latest document.
@@ -305,35 +307,41 @@ func (s *shard) repair(ctx context.Context, id []byte, 
property *propertyv1.Prop
                var doc *index.Document
                doc, err = s.buildUpdateDocument(id, property, deleteTime)
                if err != nil {
-                       return fmt.Errorf("build update document failed: %w", 
err)
+                       return false, nil, fmt.Errorf("build update document 
failed: %w", err)
+               }
+               err = s.updateDocuments(index.Documents{*doc})
+               if err != nil {
+                       return false, nil, fmt.Errorf("update document failed: 
%w", err)
                }
-               return s.updateDocuments(index.Documents{*doc})
+               return true, nil, nil
        }
 
        // if the lastest property in shard is bigger than the repaired 
property,
        // then the repaired process should be stopped.
-       if olderProperties[len(olderProperties)-1].timestamp > 
property.Metadata.ModRevision {
-               return nil
+       if (olderProperties[len(olderProperties)-1].timestamp > 
property.Metadata.ModRevision) ||
+               olderProperties[len(olderProperties)-1].timestamp == 
property.Metadata.ModRevision &&
+                       olderProperties[len(olderProperties)-1].deleteTime == 
deleteTime {
+               return false, olderProperties[len(olderProperties)-1], nil
        }
 
        docIDList := s.buildNotDeletedDocIDList(olderProperties)
        deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, 
time.Now().UnixNano())
        if err != nil {
-               return fmt.Errorf("build delete older documents failed: %w", 
err)
+               return false, nil, fmt.Errorf("build delete older documents 
failed: %w", err)
        }
        // update the property to mark it as delete
        updateDoc, err := s.buildUpdateDocument(id, property, deleteTime)
        if err != nil {
-               return fmt.Errorf("build repair document failure: %w", err)
+               return false, nil, fmt.Errorf("build repair document failure: 
%w", err)
        }
        result := make([]index.Document, 0, len(deletedDocuments)+1)
        result = append(result, deletedDocuments...)
        result = append(result, *updateDoc)
        err = s.updateDocuments(result)
        if err != nil {
-               return fmt.Errorf("update documents failed: %w", err)
+               return false, nil, fmt.Errorf("update documents failed: %w", 
err)
        }
-       return nil
+       return true, nil, nil
 }
 
 func (s *shard) buildNotDeletedDocIDList(properties []*queryProperty) [][]byte 
{
diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go
index 4b986b36..0a331cb4 100644
--- a/banyand/property/shard_test.go
+++ b/banyand/property/shard_test.go
@@ -102,7 +102,7 @@ func TestMergeDeleted(t *testing.T) {
                        }
                        defers = append(defers, snapshotDeferFunc)
                        db, err := openDB(context.Background(), dataDir, 
3*time.Second, tt.expireDeletionTime, 32, observability.BypassRegistry, 
fs.NewLocalFileSystem(),
-                               snapshotDir, "@every 10m", time.Second*10, nil)
+                               true, snapshotDir, "@every 10m", 
time.Second*10, "* 2 * * *", nil, nil, nil)
                        if err != nil {
                                t.Fatal(err)
                        }
@@ -161,13 +161,13 @@ func TestRepair(t *testing.T) {
        now := time.Now()
        tests := []struct {
                beforeApply func() []*propertyv1.Property
-               repair      func(ctx context.Context, s *shard) error
+               repair      func(ctx context.Context, s *shard) (bool, 
*queryProperty, error)
                verify      func(t *testing.T, ctx context.Context, s *shard) 
error
                name        string
        }{
                {
                        name: "repair normal with no properties",
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                property := 
generateProperty(fmt.Sprintf("test-id%d", 0), now.UnixNano(), 0)
                                return s.repair(ctx, GetPropertyID(property), 
property, 0)
                        },
@@ -182,7 +182,7 @@ func TestRepair(t *testing.T) {
                },
                {
                        name: "repair deleted with no properties",
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                property := 
generateProperty(fmt.Sprintf("test-id%d", 0), now.UnixNano(), 0)
                                return s.repair(ctx, GetPropertyID(property), 
property, now.UnixNano())
                        },
@@ -203,7 +203,7 @@ func TestRepair(t *testing.T) {
                                }
                                return
                        },
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                property := generateProperty("test-id3", 
now.UnixNano(), 1000)
                                return s.repair(ctx, GetPropertyID(property), 
property, 0)
                        },
@@ -225,7 +225,7 @@ func TestRepair(t *testing.T) {
                                        generateProperty("test-id", 
now.UnixNano()-100, 0),
                                }
                        },
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                property := generateProperty("test-id", 
now.UnixNano(), 1000)
                                return s.repair(ctx, GetPropertyID(property), 
property, now.UnixNano())
                        },
@@ -248,7 +248,7 @@ func TestRepair(t *testing.T) {
                                }
                                return res
                        },
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                // Create a property with an older mod revision
                                property := generateProperty("test-id", 
now.UnixNano()-1000, 2000)
                                return s.repair(ctx, GetPropertyID(property), 
property, 0)
@@ -273,7 +273,7 @@ func TestRepair(t *testing.T) {
                                        generateProperty("test-id", 
now.UnixNano(), 0),
                                }
                        },
-                       repair: func(ctx context.Context, s *shard) error {
+                       repair: func(ctx context.Context, s *shard) (bool, 
*queryProperty, error) {
                                // Create a property with the same data but 
marked as deleted
                                property := generateProperty("test-id", 
now.UnixNano(), 0)
                                return s.repair(ctx, GetPropertyID(property), 
property, now.UnixNano())
@@ -311,7 +311,7 @@ func TestRepair(t *testing.T) {
                        }
                        defers = append(defers, snapshotDeferFunc)
                        db, err := openDB(context.Background(), dataDir, 
3*time.Second, 1*time.Hour, 32, observability.BypassRegistry, 
fs.NewLocalFileSystem(),
-                               snapshotDir, "@every 10m", time.Second*10, nil)
+                               true, snapshotDir, "@every 10m", 
time.Second*10, "* 2 * * *", nil, nil, nil)
                        if err != nil {
                                t.Fatal(err)
                        }
@@ -333,7 +333,7 @@ func TestRepair(t *testing.T) {
                                }
                        }
 
-                       if err = tt.repair(context.Background(), newShard); err 
!= nil {
+                       if _, _, err = tt.repair(context.Background(), 
newShard); err != nil {
                                t.Fatal(err)
                        }
 
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 38b3dfb0..dd088e13 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -255,6 +255,21 @@
 - [banyandb/property/v1/property.proto](#banyandb_property_v1_property-proto)
     - [Property](#banyandb-property-v1-Property)
   
+- [banyandb/property/v1/repair.proto](#banyandb_property_v1_repair-proto)
+    - [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary)
+    - [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync)
+    - [PropertyMissing](#banyandb-property-v1-PropertyMissing)
+    - [PropertySync](#banyandb-property-v1-PropertySync)
+    - [RepairRequest](#banyandb-property-v1-RepairRequest)
+    - [RepairResponse](#banyandb-property-v1-RepairResponse)
+    - [RootCompare](#banyandb-property-v1-RootCompare)
+    - [TreeLeafNode](#banyandb-property-v1-TreeLeafNode)
+    - [TreeRoot](#banyandb-property-v1-TreeRoot)
+    - [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA)
+    - [TreeSlots](#banyandb-property-v1-TreeSlots)
+  
+    - [RepairService](#banyandb-property-v1-RepairService)
+  
 - [banyandb/property/v1/rpc.proto](#banyandb_property_v1_rpc-proto)
     - [ApplyRequest](#banyandb-property-v1-ApplyRequest)
     - [ApplyResponse](#banyandb-property-v1-ApplyResponse)
@@ -3771,6 +3786,7 @@ WriteResponse is the response contract for write
 | ----- | ---- | ----- | ----------- |
 | context | [PropagationContext](#banyandb-property-v1-PropagationContext) |  
|  |
 | group | [string](#string) |  |  |
+| shard_id | [uint32](#uint32) |  |  |
 
 
 
@@ -3840,6 +3856,207 @@ Property stores the user defined data
 
 
 
+<a name="banyandb_property_v1_repair-proto"></a>
+<p align="right"><a href="#top">Top</a></p>
+
+## banyandb/property/v1/repair.proto
+
+
+
+<a name="banyandb-property-v1-DifferTreeSummary"></a>
+
+### DifferTreeSummary
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| nodes | [TreeLeafNode](#banyandb-property-v1-TreeLeafNode) | repeated | if 
the nodes is empty, mean the server side don&#39;t have more tree leaf nodes to 
send. |
+
+
+
+
+
+
+<a name="banyandb-property-v1-NoMorePropertySync"></a>
+
+### NoMorePropertySync
+
+
+
+
+
+
+
+<a name="banyandb-property-v1-PropertyMissing"></a>
+
+### PropertyMissing
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| entity | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-PropertySync"></a>
+
+### PropertySync
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| id | [bytes](#bytes) |  |  |
+| property | [Property](#banyandb-property-v1-Property) |  |  |
+| delete_time | [int64](#int64) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-RepairRequest"></a>
+
+### RepairRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| tree_root | [TreeRoot](#banyandb-property-v1-TreeRoot) |  | compare stage |
+| tree_slots | [TreeSlots](#banyandb-property-v1-TreeSlots) |  |  |
+| property_missing | [PropertyMissing](#banyandb-property-v1-PropertyMissing) 
|  | repair stage case 1: client missing but server existing |
+| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) |  | case 
2: client existing but server missing case 3: SHA value mismatches |
+| no_more_property_sync | 
[NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) |  | if client 
side is already send all the properties(missing or property sync) which means 
the client side will not sending more properties to sync, server side should 
close the stream. |
+
+
+
+
+
+
+<a name="banyandb-property-v1-RepairResponse"></a>
+
+### RepairResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| root_compare | [RootCompare](#banyandb-property-v1-RootCompare) |  | compare 
stage |
+| differ_tree_summary | 
[DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) |  |  |
+| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) |  | 
repair stage case 1: return from PropertyMissing case 3: return if the client 
is older |
+
+
+
+
+
+
+<a name="banyandb-property-v1-RootCompare"></a>
+
+### RootCompare
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| tree_found | [bool](#bool) |  |  |
+| root_sha_match | [bool](#bool) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-TreeLeafNode"></a>
+
+### TreeLeafNode
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| slot_index | [int32](#int32) |  | slot_index is the index of the slot in the 
tree. |
+| exists | [bool](#bool) |  | if the slot is empty, means the server side 
don&#39;t have the slot. |
+| entity | [string](#string) |  | if the slot and entity exists, the SHA value 
of the entity. |
+| sha | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-TreeRoot"></a>
+
+### TreeRoot
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| group | [string](#string) |  |  |
+| shard_id | [uint32](#uint32) |  |  |
+| root_sha | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-TreeSlotSHA"></a>
+
+### TreeSlotSHA
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| slot | [int32](#int32) |  |  |
+| value | [string](#string) |  |  |
+
+
+
+
+
+
+<a name="banyandb-property-v1-TreeSlots"></a>
+
+### TreeSlots
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| slot_sha | [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA) | repeated |  |
+
+
+
+
+
+ 
+
+ 
+
+ 
+
+
+<a name="banyandb-property-v1-RepairService"></a>
+
+### RepairService
+
+
+| Method Name | Request Type | Response Type | Description |
+| ----------- | ------------ | ------------- | ------------|
+| Repair | [RepairRequest](#banyandb-property-v1-RepairRequest) stream | 
[RepairResponse](#banyandb-property-v1-RepairResponse) stream |  |
+
+ 
+
+
+
 <a name="banyandb_property_v1_rpc-proto"></a>
 <p align="right"><a href="#top">Top</a></p>
 
diff --git a/docs/concept/property-repair.md b/docs/concept/property-repair.md
index 421419fb..1c9c2050 100644
--- a/docs/concept/property-repair.md
+++ b/docs/concept/property-repair.md
@@ -153,10 +153,9 @@ After each synchronization cycle, the receiving node will 
send its trace data ba
 ## Property Repair
 
 Based on the Merkel tree and Gossip concept, the system can proceed with the 
Property Repair process.
-This process is scheduled to run on each data node daily at 1:00 AM(it's 
configurable as `property-background-repair-cron` flag), and follows these 
steps:
-1. **Select a Group**: The node retrieves a list of Property groups where the 
number of **replicas is greater than or equal to 2**, and randomly selects one 
group for repair.
+This process is scheduled to run on each data node daily at 2:00 AM(it's 
configurable as `property-background-repair-cron` flag), and follows these 
steps:
+1. **Select a Group and Select a Shard**: The node retrieves a list of 
Property groups where the number of **replicas is greater than or equal to 2**, 
and randomly selects one group for repair, then randomly selects a shard within 
that group.
 2. **Query Node List**: Then determines the list of nodes that hold replicas 
for the selected group and sends the gossip propagation message to those nodes 
to synchronize the Property data for that group.
-3. **Wait for the Result**: The initiating node waits for the final result of 
the synchronization process before proceeding.
 
 ### Property Synchronize between Two Nodes
 
@@ -165,20 +164,22 @@ Let’s refer to the current node as A and the target node 
as B. The process is
 
 1. **(A)Establish Connection**: Node A initiates a **bidirectional streaming 
connection** with node B to enable real-time, two-way data transfer.
 2. **(A)Iterate Over Shards**: Node A retrieves the list of all 
Property-related shards for the selected group and processes them one by one.
-3. **(A)Send Merkle Tree Summary**: For each shard, node A reads its Merkle 
Tree and sends a summary (including root SHA and slots SHA) to node B. 
+3. **(A)Send Merkle Tree Root**: For each shard, node A reads its Merkle Tree 
and sends root SHA to node B.
 This allows B to quickly identify which slots may contain differences.
-4. **(B)Verify Merkle Tree Summary and Respond**: Node B compares the received 
summary against its own Merkle Tree for the same shard and group:
-   * If the root SHA matches, node B returns an empty slot list, indicating no 
differences. 
-   * If the root SHA differ, node B checks the slot SHA, identifies mismatched 
slots, and sends back all relevant leaf node details, including the **slot 
index**, **entity**, and **SHA value**.
-5. **(A)Compare Leaf Data**: Node A processes the received leaf data and takes 
the following actions: 
+4. **(B)Receive Merkle Tree Root**: Node B receives the root SHA from node A 
and sent the root SHA value is same or not.
+5. **(A)Send Slot Summary**: If the root SHA matches, node A finished the 
synchronization for that shard and moves to the next one.
+   If the root SHA differs, node A sends a summary of the slots, including the 
**slot index** and **SHA value** for each slot that has differences.
+6. **(B)Verify Merkle Tree Summary and Respond**: Node B compares the received 
summary against its own Merkle Tree for the same shard and group,
+identifies mismatched slots, and sends back all relevant leaf node details, 
including the **slot index**, **entity**, and **SHA value**.
+7. **(A)Compare Leaf Data**: Node A processes the received leaf data and takes 
the following actions: 
    * For missing entities (present on B but not on A), A requests the full 
Property data from B.
    * For entities present on A, but not on B, A sends the full Property data 
to B.
    * For SHA mismatches, A sends its full Property data to B for validation.
-6. **(B)Validate Actual Data**: Node B handles the data as follows: 
+8. **(B)Validate Actual Data**: Node B handles the data as follows: 
    * For missing entities, B returns the latest version of the data.
    * For entities present on A, but not on B, B updates its local copy with 
the data from A.
    * For SHA mismatches, B uses the "last-write-win" strategy. It compares the 
version numbers. If B’s version is newer, it returns the Property data to A. If 
A’s version is newer, B updates its local copy and does not return any data. If 
the versions are the same, it selects the data from the smaller index of the 
node list; in this case, it would be from A.
-7. **(A)Update Local Data**: Node A receives updated Property data from B and 
applies the changes to its local store.
+9. **(A)Update Local Data**: Node A receives updated Property data from B and 
applies the changes to its local store.
 
 This concludes the A-to-B synchronization cycle for a given group in the 
Property repair process.
 
@@ -186,7 +187,9 @@ This concludes the A-to-B synchronization cycle for a given 
group in the Propert
 
 During the synchronization process, the system will terminate or skip 
processing under the following scenarios:
 
-1. **Merkle Tree Not Built**: If either node A or node B has not yet built the 
Merkle Tree for the target group, the gossip protocol is immediately terminated.
+1. **Merkle Tree Not Built**: 
+   1. If node A(client) has not yet built the Merkel Tree for the target 
group, the gossip protocol is immediately terminated.
+   2. If node B(server) has not yet built the Merkle Tree for the target 
group, the client would skip the current server and try to find another next to 
continue the gossip protocol.
 2. **Duplicate Sync Request**: If a new gossip sync request for the same group 
is received by either node while an existing synchronization is in progress, 
the new request is terminated to avoid conflicts.
 3. **Target Node Connection Failure Handling**: When the current node fails to 
connect to the target node, the following fallback logic applies:
    1. **Target Not in Connection Pool**: Skip the target, decrement the max 
round count by 1, and attempt the next available node. Trace information is 
recorded.
diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go
index 9812d253..12deb057 100644
--- a/pkg/index/inverted/query.go
+++ b/pkg/index/inverted/query.go
@@ -683,6 +683,25 @@ func BuildPropertyQuery(req *propertyv1.QueryRequest, 
groupField, idField string
        }, nil
 }
 
+// BuildPropertyQueryFromEntity builds a property query from entity 
information.
+func BuildPropertyQueryFromEntity(groupField, group, name, entityIDField, 
entityID string) (index.Query, error) {
+       if group == "" || name == "" || entityID == "" {
+               return nil, errors.New("group, name and entityID are mandatory 
for property query")
+       }
+       bq := bluge.NewBooleanQuery()
+       bn := newMustNode()
+       bq.AddMust(bluge.NewTermQuery(group).SetField(groupField))
+       bn.Append(newTermNode(group, nil))
+       bq.AddMust(bluge.NewTermQuery(name).SetField(index.IndexModeName))
+       bn.Append(newTermNode(name, nil))
+       bq.AddMust(bluge.NewTermQuery(entityID).SetField(entityIDField))
+       bn.Append(newTermNode(entityID, nil))
+       return &queryNode{
+               query: bq,
+               node:  bn,
+       }, nil
+}
+
 var (
        _              logical.Schema = (*schema)(nil)
        schemaInstance                = &schema{}

Reply via email to