This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch dbsl
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 04d5ffb6b5b019651fc97efa3bd61d8c6f605af1
Author: mrproliu <741550...@qq.com>
AuthorDate: Tue Aug 26 22:15:37 2025 +0700

    Add benchmark and fix some issue of background property repair (#725)
---
 .github/workflows/slow-test.yml                    |   7 +
 api/proto/banyandb/property/v1/repair.proto        |  20 +-
 banyand/liaison/grpc/property.go                   |   9 +-
 banyand/property/gossip/client.go                  |  15 +-
 banyand/property/gossip/server.go                  |   9 +-
 banyand/property/gossip/service.go                 |   2 +
 banyand/property/gossip/trace.go                   |  12 +-
 banyand/property/repair.go                         | 224 +++++++----
 banyand/property/repair_gossip.go                  | 185 +++++----
 bydbctl/internal/cmd/property_test.go              | 244 ++++++++++++
 docs/api-reference.md                              |  58 ++-
 test/docker/base-compose.yml                       |   1 +
 test/property_repair/README.md                     | 142 +++++++
 test/property_repair/base-compose.yml              | 111 ++++++
 test/property_repair/full_data/cpu-usage.png       | Bin 0 -> 60836 bytes
 .../full_data/docker-compose-3nodes.yml            |  65 ++++
 test/property_repair/full_data/integrated_test.go  | 192 +++++++++
 test/property_repair/half_data/cpu-usage.png       | Bin 0 -> 54254 bytes
 .../half_data/docker-compose-3nodes.yml            |  76 ++++
 test/property_repair/half_data/integrated_test.go  | 219 +++++++++++
 test/property_repair/prometheus-3nodes.yml         |  43 +++
 .../same_data/docker-compose-3nodes.yml            |  65 ++++
 test/property_repair/same_data/integrated_test.go  | 178 +++++++++
 test/property_repair/shared_utils.go               | 428 +++++++++++++++++++++
 24 files changed, 2144 insertions(+), 161 deletions(-)

diff --git a/.github/workflows/slow-test.yml b/.github/workflows/slow-test.yml
index c62496d3..8974c19d 100644
--- a/.github/workflows/slow-test.yml
+++ b/.github/workflows/slow-test.yml
@@ -31,3 +31,10 @@ jobs:
     with:
       options: --label-filter slow
       timeout-minutes: 120
+
+  property-repair:
+    if: github.repository == 'apache/skywalking-banyandb'
+    uses: ./.github/workflows/test.yml
+    with:
+      options: --label-filter property_repair
+      timeout-minutes: 120
diff --git a/api/proto/banyandb/property/v1/repair.proto 
b/api/proto/banyandb/property/v1/repair.proto
index 24510ed9..7c158598 100644
--- a/api/proto/banyandb/property/v1/repair.proto
+++ b/api/proto/banyandb/property/v1/repair.proto
@@ -69,7 +69,18 @@ message PropertySync {
   int64 delete_time = 3;
 }
 
-message NoMorePropertySync {}
+enum PropertySyncFromType {
+  PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED = 0;
+  PROPERTY_SYNC_FROM_TYPE_MISSING = 1; // client missing but server existing
+  PROPERTY_SYNC_FROM_TYPE_SYNC = 2; // client existing but server missing or 
SHA value mismatches
+}
+
+message PropertySyncWithFrom {
+  PropertySyncFromType from = 1;
+  PropertySync property = 2;
+}
+
+message WaitNextDifferData {}
 
 message RepairRequest {
   oneof data {
@@ -82,9 +93,8 @@ message RepairRequest {
     // case 2: client existing but server missing
     // case 3: SHA value mismatches
     PropertySync property_sync = 4;
-    // if client side is already send all the properties(missing or property 
sync)
-    // which means the client side will not sending more properties to sync, 
server side should close the stream.
-    NoMorePropertySync no_more_property_sync = 5;
+    // wait next differ tree summary for process
+    WaitNextDifferData wait_next_differ = 5;
   }
 }
 
@@ -96,7 +106,7 @@ message RepairResponse {
     // repair stage
     // case 1: return from PropertyMissing
     // case 3: return if the client is older
-    PropertySync property_sync = 3;
+    PropertySyncWithFrom property_sync = 3;
   }
 }
 
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 9fe4aa7e..cd4921c7 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -19,6 +19,7 @@ package grpc
 
 import (
        "context"
+       "fmt"
        "math"
        "sync"
        "time"
@@ -280,15 +281,21 @@ func (ps *propertyServer) replaceProperty(ctx 
context.Context, now time.Time, sh
                Id:       propertypkg.GetPropertyID(cur),
                Property: cur,
        }
+       var successCount int
        futures := make([]bus.Future, 0, len(nodes))
        for _, node := range nodes {
                f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate,
                        
bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req))
                if err != nil {
-                       return nil, errors.Wrapf(err, "failed to publish 
property update to node %s", node)
+                       ps.log.Debug().Err(err).Str("node", node).Msg("failed 
to publish property update")
+                       continue
                }
+               successCount++
                futures = append(futures, f)
        }
+       if successCount == 0 {
+               return nil, fmt.Errorf("failed to publish property update to 
any node")
+       }
        // Wait for all futures to complete, and which should last have one 
success
        haveSuccess := false
        var lastestError error
diff --git a/banyand/property/gossip/client.go 
b/banyand/property/gossip/client.go
index 2bf87ca5..5342e304 100644
--- a/banyand/property/gossip/client.go
+++ b/banyand/property/gossip/client.go
@@ -114,6 +114,7 @@ func (s *service) getRegisteredNode(id string) 
(*databasev1.Node, bool) {
        s.mu.RLock()
        defer s.mu.RUnlock()
        node, exist := s.registered[id]
+       s.log.Debug().Str("node", id).Bool("exist", 
exist).Int("register_count", len(s.registered)).Msg("get registered gossip 
node")
        return node, exist
 }
 
@@ -121,6 +122,9 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
        if s.traceStreamSelector != nil {
                s.traceStreamSelector.(schema.EventHandler).OnAddOrUpdate(md)
        }
+       if selEventHandler, ok := s.sel.(schema.EventHandler); ok {
+               selEventHandler.OnAddOrUpdate(md)
+       }
        if md.Kind != schema.KindNode {
                return
        }
@@ -129,10 +133,6 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
                s.log.Warn().Msg("invalid metadata type")
                return
        }
-       s.sel.AddNode(node)
-       if s.traceStreamSelector != nil {
-               s.traceStreamSelector.AddNode(node)
-       }
        address := node.PropertyRepairGossipGrpcAddress
        if address == "" {
                s.log.Warn().Stringer("node", node).Msg("node does not have 
gossip address, skipping registration")
@@ -143,6 +143,10 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) {
                s.log.Warn().Stringer("node", node).Msg("node does not have a 
name, skipping registration")
                return
        }
+       s.sel.AddNode(node)
+       if s.traceStreamSelector != nil {
+               s.traceStreamSelector.AddNode(node)
+       }
 
        s.mu.Lock()
        defer s.mu.Unlock()
@@ -155,6 +159,9 @@ func (s *service) OnDelete(md schema.Metadata) {
        if s.traceStreamSelector != nil {
                s.traceStreamSelector.(schema.EventHandler).OnDelete(md)
        }
+       if selEventHandler, ok := s.sel.(schema.EventHandler); ok {
+               selEventHandler.OnDelete(md)
+       }
        if md.Kind != schema.KindNode {
                return
        }
diff --git a/banyand/property/gossip/server.go 
b/banyand/property/gossip/server.go
index aa5fc36c..689586d6 100644
--- a/banyand/property/gossip/server.go
+++ b/banyand/property/gossip/server.go
@@ -51,7 +51,7 @@ var (
        }]}`, serviceName)
 
        // perNodeSyncTimeout is the timeout for each node to sync the property 
data.
-       perNodeSyncTimeout = time.Minute * 10
+       perNodeSyncTimeout = time.Hour * 1
 )
 
 func (s *service) Subscribe(listener MessageListener) {
@@ -188,11 +188,15 @@ func (q *protocolHandler) handle(ctx context.Context, 
request *handlingRequest)
                if err != nil {
                        q.s.serverMetrics.totalSendToNextErr.Inc(1, 
request.Group)
                        handlingSpan.Error(err.Error())
+                       q.s.log.Warn().Err(err).Stringer("request", request).
+                               Msgf("failed to handle gossip message for 
propagation")
                }
                q.s.serverMetrics.totalFinished.Inc(1, request.Group)
                q.s.serverMetrics.totalLatency.Inc(n.Sub(time.Unix(0, 
now)).Seconds(), request.Group)
                if !needsKeepPropagation {
-                       
q.s.serverMetrics.totalPropagationCount.Inc(float64(request.Context.CurrentPropagationCount),
+                       q.s.log.Info().Str("group", 
request.Group).Uint32("shardNum", request.ShardId).
+                               Msgf("propagation message for propagation is 
finished")
+                       q.s.serverMetrics.totalPropagationCount.Inc(1,
                                request.Group, request.Context.OriginNode)
                        q.s.serverMetrics.totalPropagationPercent.Observe(
                                
float64(request.Context.CurrentPropagationCount)/float64(request.Context.MaxPropagationCount),
 request.Group)
@@ -388,6 +392,7 @@ func (s *service) newConnectionFromNode(n *databasev1.Node) 
(*grpc.ClientConn, e
                return nil, fmt.Errorf("failed to get client transport 
credentials: %w", err)
        }
        conn, err := grpc.NewClient(n.PropertyRepairGossipGrpcAddress, 
append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...)
+       s.log.Debug().Str("address", 
n.PropertyRepairGossipGrpcAddress).Msg("starting to create gRPC client 
connection to node")
        if err != nil {
                return nil, fmt.Errorf("failed to create gRPC client connection 
to node %s: %w", n.PropertyRepairGossipGrpcAddress, err)
        }
diff --git a/banyand/property/gossip/service.go 
b/banyand/property/gossip/service.go
index 0f76fa4a..db44a65d 100644
--- a/banyand/property/gossip/service.go
+++ b/banyand/property/gossip/service.go
@@ -112,6 +112,7 @@ func NewMessenger(omr observability.MetricsRegistry, 
metadata metadata.Repo, pip
                registered:       make(map[string]*databasev1.Node),
                scheduleInterval: time.Hour * 2,
                sel:              node.NewRoundRobinSelector("", metadata),
+               port:             17932,
        }
 }
 
@@ -133,6 +134,7 @@ func (s *service) PreRun(ctx context.Context) error {
        s.listeners = make([]MessageListener, 0)
        s.serverMetrics = newServerMetrics(s.omr.With(serverScope))
        if s.metadata != nil {
+               s.sel.OnInit([]schema.Kind{schema.KindGroup})
                s.metadata.RegisterHandler("property-repair-nodes", 
schema.KindNode, s)
                s.metadata.RegisterHandler("property-repair-groups", 
schema.KindGroup, s)
                if err := s.initTracing(ctx); err != nil {
diff --git a/banyand/property/gossip/trace.go b/banyand/property/gossip/trace.go
index 1f18e132..6f5ed6da 100644
--- a/banyand/property/gossip/trace.go
+++ b/banyand/property/gossip/trace.go
@@ -22,6 +22,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "sync"
        "sync/atomic"
        "time"
 
@@ -285,11 +286,12 @@ type recordTrace struct {
        id          string
        allSpans    []*recordTraceSpan
        roundNum    int
+       lock        sync.Mutex
 }
 
 func (r *recordTrace) CreateSpan(parent Span, message string) Span {
        spanID := fmt.Sprintf("%s_%d", r.s.nodeID, time.Now().UnixNano())
-       r.request.TraceContext.ParentSpanId = spanID
+       r.changeParentID(spanID)
        span := &recordTraceSpan{
                trace:     r,
                id:        spanID,
@@ -302,6 +304,12 @@ func (r *recordTrace) CreateSpan(parent Span, message 
string) Span {
        return span
 }
 
+func (r *recordTrace) changeParentID(id string) {
+       r.lock.Lock()
+       defer r.lock.Unlock()
+       r.request.TraceContext.ParentSpanId = id
+}
+
 func (r *recordTrace) ActivateSpan() Span {
        return r.currentSpan
 }
@@ -350,7 +358,7 @@ func (r *recordTraceSpan) End() {
        // if still have parent span, then this is not the root span
        // change the context to parent span
        if r.parent != nil {
-               r.trace.request.TraceContext.ParentSpanId = r.parent.ID()
+               r.trace.changeParentID(r.parent.ID())
        }
 }
 
diff --git a/banyand/property/repair.go b/banyand/property/repair.go
index cf174201..fb0b07a3 100644
--- a/banyand/property/repair.go
+++ b/banyand/property/repair.go
@@ -62,7 +62,6 @@ import (
 
 const (
        repairBatchSearchSize = 100
-       repairFileNewLine     = '\n'
 )
 
 type repair struct {
@@ -126,6 +125,7 @@ func (r *repair) checkHasUpdates() (bool, error) {
 }
 
 func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group 
string) (err error) {
+       r.l.Debug().Msgf("starting building status from snapshot path %s, 
group: %s", snapshotPath, group)
        startTime := time.Now()
        defer func() {
                r.metrics.totalBuildTreeFinished.Inc(1)
@@ -449,20 +449,27 @@ func (r *repairTreeFileReader) seekPosition(offset int64, 
whence int) error {
        return nil
 }
 
-func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, 
forceReFromStart bool) ([]*repairTreeNode, error) {
+func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, 
forceReFromStart bool) (nodes []*repairTreeNode, err error) {
+       defer func() {
+               recoverErr := recover()
+               if recoverErr != nil {
+                       if recoverData, ok := recoverErr.(error); ok && err == 
nil {
+                               err = fmt.Errorf("reading repair tree file %s 
failure: %w", r.file.Name(), recoverData)
+                       }
+               }
+       }()
        if parent == nil {
                // reading the root node
-               err := r.seekPosition(r.footer.slotNodeFinishedOffset, 
io.SeekStart)
-               if err != nil {
+               if err = r.seekPosition(r.footer.slotNodeFinishedOffset, 
io.SeekStart); err != nil {
                        return nil, fmt.Errorf("seeking to root node offset %d 
in file %s failure: %w", r.footer.slotNodeFinishedOffset, r.file.Name(), err)
                }
                rootDataBytes := make([]byte, r.footer.rootNodeLen)
                if _, err = io.ReadFull(r.reader, rootDataBytes); err != nil {
                        return nil, fmt.Errorf("reading root node data from 
file %s failure: %w", r.file.Name(), err)
                }
-               _, shaValue, err := encoding.DecodeBytes(rootDataBytes)
-               if err != nil {
-                       return nil, fmt.Errorf("decoding root node sha value 
from file %s failure: %w", r.file.Name(), err)
+               _, shaValue, decodeErr := encoding.DecodeBytes(rootDataBytes)
+               if decodeErr != nil {
+                       return nil, fmt.Errorf("decoding root node sha value 
from file %s failure: %w", r.file.Name(), decodeErr)
                }
                return []*repairTreeNode{
                        {
@@ -472,54 +479,8 @@ func (r *repairTreeFileReader) read(parent 
*repairTreeNode, pagingSize int64, fo
                }, nil
        }
 
-       var err error
        if parent.tp == repairTreeNodeTypeRoot {
-               needSeek := false
-               if r.paging == nil || r.paging.lastNode != parent || 
forceReFromStart {
-                       needSeek = true
-                       r.paging = newRepairTreeReaderPage(parent, 
r.footer.slotNodeCount)
-               }
-               if needSeek {
-                       // reading the slot nodes
-                       if err = 
r.seekPosition(r.footer.leafNodeFinishedOffset, io.SeekStart); err != nil {
-                               return nil, fmt.Errorf("seeking to slot node 
offset %d in file %s failure: %w", r.footer.leafNodeFinishedOffset, 
r.file.Name(), err)
-                       }
-               }
-               var slotNodeIndex, leafStartOff, leafCount int64
-               var slotShaVal, slotDataBytes []byte
-               count := r.paging.nextPage(pagingSize)
-               nodes := make([]*repairTreeNode, 0, count)
-               for i := int64(0); i < count; i++ {
-                       slotDataBytes, err = 
r.reader.ReadBytes(repairFileNewLine)
-                       if err != nil {
-                               return nil, fmt.Errorf("reading slot node data 
from file %s failure: %w", r.file.Name(), err)
-                       }
-                       slotDataBytes, slotNodeIndex, err = 
encoding.BytesToVarInt64(slotDataBytes)
-                       if err != nil {
-                               return nil, fmt.Errorf("decoding slot node 
index from file %s failure: %w", r.file.Name(), err)
-                       }
-                       slotDataBytes, slotShaVal, err = 
encoding.DecodeBytes(slotDataBytes)
-                       if err != nil {
-                               return nil, fmt.Errorf("decoding slot node sha 
value from file %s failure: %w", r.file.Name(), err)
-                       }
-                       slotDataBytes, leafStartOff, err = 
encoding.BytesToVarInt64(slotDataBytes)
-                       if err != nil {
-                               return nil, fmt.Errorf("decoding slot node leaf 
start offset from file %s failure: %w", r.file.Name(), err)
-                       }
-                       _, leafCount, err = 
encoding.BytesToVarInt64(slotDataBytes)
-                       if err != nil {
-                               return nil, fmt.Errorf("decoding slot node leaf 
length from file %s failure: %w", r.file.Name(), err)
-                       }
-                       nodes = append(nodes, &repairTreeNode{
-                               shaValue:  string(slotShaVal),
-                               slotInx:   int32(slotNodeIndex),
-                               tp:        repairTreeNodeTypeSlot,
-                               leafStart: leafStartOff,
-                               leafCount: leafCount,
-                       })
-               }
-
-               return nodes, nil
+               return r.readSlots(parent, pagingSize, forceReFromStart)
        } else if parent.tp == repairTreeNodeTypeLeaf {
                return nil, nil
        }
@@ -538,11 +499,16 @@ func (r *repairTreeFileReader) read(parent 
*repairTreeNode, pagingSize int64, fo
        }
        var entity, shaVal []byte
        count := r.paging.nextPage(pagingSize)
-       nodes := make([]*repairTreeNode, 0, count)
        for i := int64(0); i < count; i++ {
-               leafDataBytes, err := r.reader.ReadBytes(repairFileNewLine)
+               var dataSize int64
+               err = binary.Read(r.reader, binary.LittleEndian, &dataSize)
                if err != nil {
-                       return nil, fmt.Errorf("reading leaf node data from 
file %s failure: %w", r.file.Name(), err)
+                       return nil, fmt.Errorf("reading leaf node data size 
from file %s failure: %w", r.file.Name(), err)
+               }
+               leafDataBytes := make([]byte, dataSize)
+               leafReadLen, err := io.ReadFull(r.reader, leafDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("reading leaf node data from 
file %s failure(readed: %d): %w", r.file.Name(), leafReadLen, err)
                }
                leafDataBytes, entity, err = encoding.DecodeBytes(leafDataBytes)
                if err != nil {
@@ -562,6 +528,61 @@ func (r *repairTreeFileReader) read(parent 
*repairTreeNode, pagingSize int64, fo
        return nodes, nil
 }
 
+func (r *repairTreeFileReader) readSlots(parent *repairTreeNode, pagingSize 
int64, forceReFromStart bool) (nodes []*repairTreeNode, err error) {
+       needSeek := false
+       if r.paging == nil || r.paging.lastNode != parent || forceReFromStart {
+               needSeek = true
+               r.paging = newRepairTreeReaderPage(parent, 
r.footer.slotNodeCount)
+       }
+       if needSeek {
+               // reading the slot nodes
+               if err = r.seekPosition(r.footer.leafNodeFinishedOffset, 
io.SeekStart); err != nil {
+                       return nil, fmt.Errorf("seeking to slot node offset %d 
in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err)
+               }
+       }
+       var slotNodeIndex, leafStartOff, leafCount int64
+       var slotShaVal, slotDataBytes []byte
+       count := r.paging.nextPage(pagingSize)
+       for i := int64(0); i < count; i++ {
+               var dataSize int64
+               err = binary.Read(r.reader, binary.LittleEndian, &dataSize)
+               if err != nil {
+                       return nil, fmt.Errorf("reading slot node data size 
from file %s failure: %w", r.file.Name(), err)
+               }
+               slotDataBytes = make([]byte, dataSize)
+               var readSize int
+               readSize, err = io.ReadFull(r.reader, slotDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("reading slot node data from 
file %s failure: %w, read %d bytes", r.file.Name(), err, readSize)
+               }
+               slotDataBytes, slotNodeIndex, err = 
encoding.BytesToVarInt64(slotDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding slot node index from 
file %s failure: %w", r.file.Name(), err)
+               }
+               slotDataBytes, slotShaVal, err = 
encoding.DecodeBytes(slotDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding slot node sha value 
from file %s failure: %w", r.file.Name(), err)
+               }
+               slotDataBytes, leafStartOff, err = 
encoding.BytesToVarInt64(slotDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding slot node leaf start 
offset from file %s failure: %w", r.file.Name(), err)
+               }
+               _, leafCount, err = encoding.BytesToVarInt64(slotDataBytes)
+               if err != nil {
+                       return nil, fmt.Errorf("decoding slot node leaf length 
from file %s failure: %w", r.file.Name(), err)
+               }
+               nodes = append(nodes, &repairTreeNode{
+                       shaValue:  string(slotShaVal),
+                       slotInx:   int32(slotNodeIndex),
+                       tp:        repairTreeNodeTypeSlot,
+                       leafStart: leafStartOff,
+                       leafCount: leafCount,
+               })
+       }
+
+       return nodes, nil
+}
+
 func (r *repairTreeFileReader) close() error {
        return r.file.Close()
 }
@@ -756,7 +777,10 @@ func (r *repairSlotFile) append(entity, shaValue []byte) 
error {
        result := make([]byte, 0)
        result = encoding.EncodeBytes(result, entity)
        result = encoding.EncodeBytes(result, shaValue)
-       result = append(result, repairFileNewLine)
+       err = binary.Write(r.writer, binary.LittleEndian, int64(len(result)))
+       if err != nil {
+               return fmt.Errorf("writing entity and sha value length to 
repair slot file %s failure: %w", r.path, err)
+       }
        _, err = r.writer.Write(result)
        if err != nil {
                return fmt.Errorf("writing entity and sha value to repair slot 
file %s failure: %w", r.path, err)
@@ -846,7 +870,15 @@ func (r *repairTreeBuilder) build() (err error) {
                data = encoding.EncodeBytes(data, slot.shaVal)
                data = encoding.VarInt64ToBytes(data, slot.startOff)
                data = encoding.VarInt64ToBytes(data, slot.leafCount)
-               data = append(data, repairFileNewLine)
+
+               slotTotalLen := int64(len(data))
+               err = binary.Write(r.writer, binary.LittleEndian, slotTotalLen)
+               if err != nil {
+                       return fmt.Errorf("writing slot node to repair tree 
file failure: %w", err)
+               }
+               slotNodesLen += 8
+               slotNodesFinishedOffset += 8
+
                writedLen, err = r.writer.Write(data)
                if err != nil {
                        return fmt.Errorf("writing slot node to repair tree 
file failure: %w", err)
@@ -932,18 +964,20 @@ type repairScheduler struct {
        latestBuildTreeSchedule   time.Time
        buildTreeClock            clock.Clock
        gossipMessenger           gossip.Messenger
-       closer                    *run.Closer
-       buildSnapshotFunc         func(context.Context) (string, error)
+       l                         *logger.Logger
+       gossipRepairing           *int32
        repairTreeScheduler       *timestamp.Scheduler
        quickRepairNotified       *int32
        db                        *database
-       l                         *logger.Logger
+       closer                    *run.Closer
        metrics                   *repairSchedulerMetrics
-       treeSlotCount             int
+       buildSnapshotFunc         func(context.Context) (string, error)
+       scheduleBasicFile         string
        buildTreeScheduleInterval time.Duration
        quickBuildTreeTime        time.Duration
-       lastBuildTimeLocker       sync.Mutex
+       treeSlotCount             int
        treeLocker                sync.RWMutex
+       lastBuildTimeLocker       sync.Mutex
 }
 
 // nolint: contextcheck
@@ -970,6 +1004,8 @@ func newRepairScheduler(
                metrics:             
newRepairSchedulerMetrics(omr.With(propertyScope.SubScope("scheduler"))),
                gossipMessenger:     gossipMessenger,
                treeSlotCount:       treeSlotCount,
+               scheduleBasicFile:   filepath.Join(db.repairBaseDir, 
"scheduled.json"),
+               gossipRepairing:     new(int32),
        }
        c := timestamp.NewScheduler(l, s.buildTreeClock)
        s.repairTreeScheduler = c
@@ -982,10 +1018,13 @@ func newRepairScheduler(
        }
        err = c.Register("trigger", 
cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
                triggerCronExp, func(time.Time, *logger.Logger) bool {
-                       gossipErr := s.doRepairGossip(s.closer.Ctx())
+                       l.Debug().Msgf("starting background repair gossip")
+                       group, shardNum, nodes, gossipErr := 
s.doRepairGossip(s.closer.Ctx())
                        if gossipErr != nil {
                                s.l.Err(gossipErr).Msg("failed to repair 
gossip")
+                               return true
                        }
+                       s.l.Info().Str("group", group).Uint32("shardNum", 
shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled")
                        return true
                })
        if err != nil {
@@ -998,6 +1037,14 @@ func newRepairScheduler(
        return s, nil
 }
 
+func (r *repairScheduler) setGossipRepairing(repairing bool) {
+       val := int32(0)
+       if repairing {
+               val = 1
+       }
+       atomic.StoreInt32(r.gossipRepairing, val)
+}
+
 func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron 
bool) {
        if !r.verifyShouldExecuteBuildTree(t, triggerByCron) {
                return
@@ -1012,6 +1059,12 @@ func (r *repairScheduler) doBuildTreeScheduler(t 
time.Time, triggerByCron bool)
 func (r *repairScheduler) verifyShouldExecuteBuildTree(t time.Time, 
triggerByCron bool) bool {
        r.lastBuildTimeLocker.Lock()
        defer r.lastBuildTimeLocker.Unlock()
+       // ignore the build tree if the gossip repairing is in progress
+       if atomic.LoadInt32(r.gossipRepairing) == 1 {
+               r.l.Debug().Msg("gossip repairing is in progress, skipping 
build tree")
+               return false
+       }
+
        if !triggerByCron {
                // if not triggered by cron, we need to check if the time is 
after the (last scheduled time + half of the interval)
                if 
r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval
 / 2)) {
@@ -1035,6 +1088,33 @@ func (r *repairScheduler) initializeInterval() error {
        return nil
 }
 
+func (r *repairScheduler) saveHasBuildTree() error {
+       // save the latest build tree time to the database
+       now := time.Now()
+       data := make(map[string]string, 1)
+       data["lastBuildTreeTime"] = now.Format(time.RFC3339)
+       json, err := json.Marshal(data)
+       if err != nil {
+               return err
+       }
+       err = os.MkdirAll(path.Dir(r.scheduleBasicFile), storage.FilePerm)
+       if err != nil {
+               return fmt.Errorf("creating directory for repair build tree 
file %s failure: %w", r.scheduleBasicFile, err)
+       }
+       return os.WriteFile(r.scheduleBasicFile, json, storage.FilePerm)
+}
+
+func (r *repairScheduler) checkHasBuildTree() (bool, error) {
+       _, err := os.Stat(r.scheduleBasicFile)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return false, nil // no build tree file, means no build 
tree has been done
+               }
+               return false, fmt.Errorf("checking repair build tree file 
existence failure: %w", err)
+       }
+       return true, nil
+}
+
 //nolint:contextcheck
 func (r *repairScheduler) doBuildTree() (err error) {
        now := time.Now()
@@ -1043,8 +1123,14 @@ func (r *repairScheduler) doBuildTree() (err error) {
                r.metrics.totalRepairBuildTreeFinished.Inc(1)
                
r.metrics.totalRepairBuildTreeLatency.Inc(time.Since(now).Seconds())
                if err != nil {
+                       r.l.Err(err).Msg("repair build tree failed")
                        r.metrics.totalRepairBuildTreeFailures.Inc(1)
                }
+
+               saveStatusErr := r.saveHasBuildTree()
+               if saveStatusErr != nil {
+                       r.l.Err(saveStatusErr).Msgf("saving repair build tree 
status failure")
+               }
        }()
        sLst := r.db.sLst.Load()
        if sLst == nil {
@@ -1140,17 +1226,17 @@ func (r *repairScheduler) close() {
        r.closer.CloseThenWait()
 }
 
-func (r *repairScheduler) doRepairGossip(ctx context.Context) error {
+func (r *repairScheduler) doRepairGossip(ctx context.Context) (string, uint32, 
[]string, error) {
        group, shardNum, err := r.randomSelectGroup(ctx)
        if err != nil {
-               return fmt.Errorf("selecting random group failure: %w", err)
+               return "", 0, nil, fmt.Errorf("selecting random group failure: 
%w", err)
        }
 
        nodes, err := r.gossipMessenger.LocateNodes(group.Metadata.Name, 
shardNum, uint32(r.copiesCount(group)))
        if err != nil {
-               return fmt.Errorf("locating nodes for group %s, shard %d 
failure: %w", group.Metadata.Name, shardNum, err)
+               return "", 0, nil, fmt.Errorf("locating nodes for group %s, 
shard %d failure: %w", group.Metadata.Name, shardNum, err)
        }
-       return r.gossipMessenger.Propagation(nodes, group.Metadata.Name, 
shardNum)
+       return group.Metadata.Name, shardNum, nodes, 
r.gossipMessenger.Propagation(nodes, group.Metadata.Name, shardNum)
 }
 
 func (r *repairScheduler) randomSelectGroup(ctx context.Context) 
(*commonv1.Group, uint32, error) {
diff --git a/banyand/property/repair_gossip.go 
b/banyand/property/repair_gossip.go
index 4468c300..5babb981 100644
--- a/banyand/property/repair_gossip.go
+++ b/banyand/property/repair_gossip.go
@@ -57,6 +57,13 @@ func (b *repairGossipBase) getTreeReader(ctx 
context.Context, group string, shar
                if err != nil {
                        return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
                }
+               if !stateExist {
+                       // check has scheduled or not
+                       stateExist, err = b.scheduler.checkHasBuildTree()
+                       if err != nil {
+                               return nil, false, fmt.Errorf("failed to check 
if the tree state file exists: %w", err)
+                       }
+               }
                // if the tree is nil, it means the tree is no data
                return &emptyRepairTreeReader{}, stateExist, nil
        }
@@ -198,7 +205,9 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
        startSyncSpan.Tag(gossip.TraceTagTargetNode, nextNode.Target())
        client := propertyv1.NewRepairServiceClient(nextNode)
        var hasPropertyUpdated bool
+       r.scheduler.setGossipRepairing(true)
        defer func() {
+               r.scheduler.setGossipRepairing(false)
                if err != nil {
                        startSyncSpan.Tag("has_property_updates", 
fmt.Sprintf("%t", hasPropertyUpdated))
                        startSyncSpan.Error(err.Error())
@@ -243,6 +252,7 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
        sendTreeSummarySpan.End()
        // if the root node matched, then ignore the repair
        if rootMatch {
+               r.scheduler.l.Debug().Msgf("tree root for group %s, shard %d 
matched, no need to repair", request.Group, request.ShardId)
                return nil
        }
 
@@ -259,6 +269,7 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                recvResp, err := stream.Recv()
                if err != nil {
                        if errors.Is(err, io.EOF) {
+                               r.scheduler.l.Debug().Msgf("no more messages 
from server, client side finished syncing properties for group %s, shard %d", 
request.Group, request.ShardId)
                                return nil
                        }
                        return fmt.Errorf("failed to keep receive tree summary 
from server: %w", err)
@@ -274,27 +285,38 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                                differSpan.End()
                                return nil
                        }
+                       r.scheduler.l.Debug().Msgf("received differ tree 
summary from server, nodes count: %d", len(resp.DifferTreeSummary.Nodes))
                        r.handleDifferSummaryFromServer(ctx, stream, 
resp.DifferTreeSummary, reader, syncShard, rootNode, leafReader, 
&notProcessingClientNode, &currentComparingClientNode)
                        firstTreeSummaryResp = false
+
+                       if err = stream.Send(&propertyv1.RepairRequest{
+                               Data: &propertyv1.RepairRequest_WaitNextDiffer{
+                                       WaitNextDiffer: 
&propertyv1.WaitNextDifferData{},
+                               },
+                       }); err != nil {
+                               differSpan.Error(err.Error())
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
send wait next differ request to server, group: %s, shard: %d", request.Group, 
request.ShardId)
+                       }
                        differSpan.End()
                case *propertyv1.RepairResponse_PropertySync:
+                       r.scheduler.l.Debug().Msgf("received repair response 
from server")
                        // step 3: keep receiving messages from the server
                        // if the server still sending different nodes, we 
should keep reading them
                        // if the server sends a PropertySync, we should repair 
the property and send the newer property back to the server if needed
                        sync := resp.PropertySync
                        syncSpan := tracer.CreateSpan(startSyncSpan, "repair 
property")
                        syncSpan.Tag(gossip.TraceTagOperateType, 
gossip.TraceTagOperateRepairProperty)
-                       syncSpan.Tag(gossip.TraceTagPropertyID, string(sync.Id))
-                       updated, newer, err := syncShard.repair(ctx, sync.Id, 
sync.Property, sync.DeleteTime)
+                       syncSpan.Tag(gossip.TraceTagPropertyID, 
string(sync.Property.Id))
+                       updated, newer, err := syncShard.repair(ctx, 
sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime)
                        syncSpan.Tag("updated", fmt.Sprintf("%t", updated))
                        syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer != 
nil))
                        if err != nil {
                                syncSpan.Error(err.Error())
-                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
repair property %s", sync.Id)
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
repair property %s", sync.Property.Id)
                                
r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, 
fmt.Sprintf("%d", request.ShardId))
                        }
                        if updated {
-                               r.scheduler.l.Debug().Msgf("successfully 
repaired property %s on client side", sync.Id)
+                               r.scheduler.l.Debug().Msgf("successfully 
repaired property %s on client side", sync.Property.Id)
                                
r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, 
fmt.Sprintf("%d", request.ShardId))
                                hasPropertyUpdated = true
                                syncSpan.End()
@@ -302,7 +324,10 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                        }
                        // if the property hasn't been updated, and the newer 
property is not nil,
                        // which means the property is newer than the server 
side,
-                       if !updated && newer != nil {
+
+                       // if the sync.From is PROPERTY_MISSING, it means the 
client doesn't have the property, but the server side has,
+                       // but the current client has the newer property, it's 
meaning the client has updated, so no need to send the property back to the 
server
+                       if !updated && newer != nil && sync.From != 
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING {
                                var p propertyv1.Property
                                err = protojson.Unmarshal(newer.source, &p)
                                if err != nil {
@@ -371,14 +396,7 @@ func (r *repairGossipClient) handleDifferSummaryFromServer(
                        // then queried and sent property to server
                        r.queryPropertyAndSendToServer(ctx, syncShard, 
(*notProcessingClientNode).entity, stream)
                }
-               err := stream.Send(&propertyv1.RepairRequest{
-                       Data: &propertyv1.RepairRequest_NoMorePropertySync{
-                               NoMorePropertySync: 
&propertyv1.NoMorePropertySync{},
-                       },
-               })
-               if err != nil {
-                       r.scheduler.l.Warn().Err(err).Msgf("failed to send no 
more property sync request to server")
-               }
+               r.scheduler.l.Debug().Msg("no more property sync request sent 
to server")
                return
        }
 
@@ -568,7 +586,11 @@ func newRepairGossipServer(s *repairScheduler) 
*repairGossipServer {
        }
 }
 
-func (r *repairGossipServer) Repair(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]) error {
+func (r *repairGossipServer) Repair(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]) (err error) {
+       r.scheduler.setGossipRepairing(true)
+       defer func() {
+               r.scheduler.setGossipRepairing(false)
+       }()
        summary, reader, err := r.combineTreeSummary(s)
        if err != nil {
                return fmt.Errorf("failed to receive tree summary request: %w", 
err)
@@ -583,6 +605,9 @@ func (r *repairGossipServer) Repair(s 
grpclib.BidiStreamingServer[propertyv1.Rep
        shardID := summary.shardID
        var hasPropertyUpdated bool
        defer func() {
+               if err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("server failed to 
repair gossip for group %s, shard %d", group, shardID)
+               }
                if hasPropertyUpdated {
                        err = 
r.scheduler.buildingTree([]common.ShardID{common.ShardID(shardID)}, group, true)
                        if err != nil {
@@ -621,42 +646,10 @@ func (r *repairGossipServer) Repair(s 
grpclib.BidiStreamingServer[propertyv1.Rep
                        serverMissingSlots = append(serverMissingSlots, 
clientSlot.index)
                }
        }
-       sent, err := r.sendDifferSlots(reader, clientMismatchSlots, 
serverMissingSlots, s)
-       if err != nil {
-               r.scheduler.l.Warn().Err(err).Msgf("failed to send different 
slots to client")
-       }
-       // send the tree and no more different slots needs to be sent
-       err = r.sendEmptyDiffer(s)
-       if !sent {
-               return err
-       } else if err != nil {
-               // should keep the message receiving loop
-               r.scheduler.l.Warn().Msgf("sent no difference slot to client 
failure, error: %v", err)
-       }
-       for {
-               missingOrSyncRequest, err := s.Recv()
-               if err != nil {
-                       if errors.Is(err, io.EOF) {
-                               return nil
-                       }
-                       return fmt.Errorf("failed to receive missing or sync 
request: %w", err)
-               }
-               syncShard, err := r.scheduler.db.loadShard(s.Context(), 
common.ShardID(shardID))
-               if err != nil {
-                       return fmt.Errorf("shard %d load failure on server 
side: %w", shardID, err)
-               }
-               switch req := missingOrSyncRequest.Data.(type) {
-               case *propertyv1.RepairRequest_PropertyMissing:
-                       r.processPropertyMissing(s.Context(), syncShard, 
req.PropertyMissing, s)
-               case *propertyv1.RepairRequest_PropertySync:
-                       if r.processPropertySync(s.Context(), syncShard, 
req.PropertySync, s, group) {
-                               hasPropertyUpdated = true
-                       }
-               case *propertyv1.RepairRequest_NoMorePropertySync:
-                       // if the client has no more property sync, the server 
side should stop the sync
-                       return nil
-               }
-       }
+
+       // send differ slots to the client and wait for the client process
+       r.sendDifferSlots(reader, clientMismatchSlots, serverMissingSlots, 
group, shardID, &hasPropertyUpdated, s)
+       return nil
 }
 
 func (r *repairGossipServer) combineTreeSummary(
@@ -770,13 +763,17 @@ func (r *repairGossipServer) processPropertySync(
                // send the newer property to the client
                err = s.Send(&propertyv1.RepairResponse{
                        Data: &propertyv1.RepairResponse_PropertySync{
-                               PropertySync: &propertyv1.PropertySync{
-                                       Id:         newer.id,
-                                       Property:   &p,
-                                       DeleteTime: newer.deleteTime,
+                               PropertySync: &propertyv1.PropertySyncWithFrom{
+                                       From: 
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_SYNC,
+                                       Property: &propertyv1.PropertySync{
+                                               Id:         newer.id,
+                                               Property:   &p,
+                                               DeleteTime: newer.deleteTime,
+                                       },
                                },
                        },
                })
+               r.scheduler.l.Debug().Msgf("sending repaired property %s on 
server side", sync.Id)
                if err != nil {
                        r.scheduler.l.Warn().Err(err).Msgf("failed to send 
newer property sync response to client, entity: %s", newer.id)
                        return false
@@ -801,13 +798,17 @@ func (r *repairGossipServer) processPropertyMissing(
        }
        err = s.Send(&propertyv1.RepairResponse{
                Data: &propertyv1.RepairResponse_PropertySync{
-                       PropertySync: &propertyv1.PropertySync{
-                               Id:         property.id,
-                               Property:   data,
-                               DeleteTime: property.deleteTime,
+                       PropertySync: &propertyv1.PropertySyncWithFrom{
+                               From: 
propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING,
+                               Property: &propertyv1.PropertySync{
+                                       Id:         property.id,
+                                       Property:   data,
+                                       DeleteTime: property.deleteTime,
+                               },
                        },
                },
        })
+       r.scheduler.l.Debug().Msgf("sending missing property on server side: 
%s", property.id)
        if err != nil {
                r.scheduler.l.Warn().Err(err).Msgf("failed to send property 
sync response to client, entity: %s", missing.Entity)
                return
@@ -818,16 +819,21 @@ func (r *repairGossipServer) sendDifferSlots(
        reader repairTreeReader,
        clientMismatchSlots []*repairTreeNode,
        serverMissingSlots []int32,
+       group string,
+       shardID uint32,
+       hasPropertyUpdated *bool,
        s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
-) (hasSent bool, err error) {
+) {
        var leafNodes []*repairTreeNode
+       var err error
 
        // send server mismatch slots to the client
        for _, node := range clientMismatchSlots {
                for {
                        leafNodes, err = reader.read(node, 
gossipMerkleTreeReadPageSize, false)
                        if err != nil {
-                               return hasSent, fmt.Errorf("failed to read leaf 
nodes for slot %d: %w", node.slotInx, err)
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
read leaf nodes for slot %d", node.slotInx)
+                               continue
                        }
                        // if there are no more leaf nodes, we can skip this 
slot
                        if len(leafNodes) == 0 {
@@ -853,8 +859,12 @@ func (r *repairGossipServer) sendDifferSlots(
                        if err != nil {
                                r.scheduler.l.Warn().Err(err).
                                        Msgf("failed to send leaf nodes for 
slot %d", node.slotInx)
-                       } else {
-                               hasSent = true
+                               continue
+                       }
+
+                       if err = r.recvMsgAndWaitReadNextDiffer(s, group, 
shardID, hasPropertyUpdated); err != nil {
+                               r.scheduler.l.Warn().Err(err).Msgf("failed to 
waiting the client side process differ finished")
+                               return
                        }
                }
        }
@@ -879,11 +889,56 @@ func (r *repairGossipServer) sendDifferSlots(
                if err != nil {
                        r.scheduler.l.Warn().Err(err).
                                Msgf("failed to send missing slots")
-               } else {
-                       hasSent = true
+                       return
+               }
+               r.scheduler.l.Debug().Msg("successfully sent differ-tree 
summary to client")
+               if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID, 
hasPropertyUpdated); err != nil {
+                       r.scheduler.l.Warn().Err(err).Msgf("failed to waiting 
the client side process differ finished")
+                       return
+               }
+       }
+
+       // send the empty differ response to the client to make sure the client 
side finished processing
+       if err = r.sendEmptyDiffer(s); err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to send empty differ 
response to client")
+               return
+       }
+       if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID, 
hasPropertyUpdated); err != nil {
+               r.scheduler.l.Warn().Err(err).Msgf("failed to waiting the 
client side process empty differ finished")
+               return
+       }
+}
+
+func (r *repairGossipServer) recvMsgAndWaitReadNextDiffer(
+       s grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse],
+       group string,
+       shardID uint32,
+       hasPropertyUpdated *bool,
+) error {
+       for {
+               missingOrSyncRequest, err := s.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               r.scheduler.l.Debug().Msgf("client closed the 
stream, no more missing or sync request")
+                               return nil
+                       }
+                       return fmt.Errorf("failed to receive missing or sync 
request: %w", err)
+               }
+               syncShard, err := r.scheduler.db.loadShard(s.Context(), 
common.ShardID(shardID))
+               if err != nil {
+                       return fmt.Errorf("shard %d load failure on server 
side: %w", shardID, err)
+               }
+               switch req := missingOrSyncRequest.Data.(type) {
+               case *propertyv1.RepairRequest_PropertyMissing:
+                       r.processPropertyMissing(s.Context(), syncShard, 
req.PropertyMissing, s)
+               case *propertyv1.RepairRequest_PropertySync:
+                       if r.processPropertySync(s.Context(), syncShard, 
req.PropertySync, s, group) {
+                               *hasPropertyUpdated = true
+                       }
+               case *propertyv1.RepairRequest_WaitNextDiffer:
+                       return nil
                }
        }
-       return hasSent, nil
 }
 
 func (r *repairGossipServer) sendEmptyDiffer(s 
grpclib.BidiStreamingServer[propertyv1.RepairRequest, 
propertyv1.RepairResponse]) error {
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index cb773acb..16675539 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -701,6 +701,220 @@ projection:
        })
 })
 
+var _ = Describe("Property Cluster Resilience with 5 Data Nodes", func() {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var nodeIDs []string
+       var nodeRepairAddrs []string
+       var nodeDirs []string
+       var closeNodes []func()
+       var messenger gossip.Messenger
+       var server embeddedetcd.Server
+       var ep string
+       nodeCount := 5
+       closedNodeCount := 3
+
+       BeforeEach(func() {
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               var ports []int
+               var err error
+               var spaceDefs []func()
+
+               // Create 5 data nodes
+               nodeIDs = make([]string, nodeCount)
+               nodeRepairAddrs = make([]string, nodeCount)
+               nodeDirs = make([]string, nodeCount)
+               closeNodes = make([]func(), nodeCount)
+               spaceDefs = make([]func(), nodeCount)
+
+               // Create data directories for 5 nodes
+               for i := 0; i < nodeCount; i++ {
+                       nodeDirs[i], spaceDefs[i], err = test.NewSpace()
+                       Expect(err).NotTo(HaveOccurred())
+               }
+
+               // Setup cluster with etcd server
+               By("Starting etcd server")
+               ports, err = test.AllocateFreePorts(2)
+               Expect(err).NotTo(HaveOccurred())
+               dir, spaceDef, err := test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               ep = fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+               server, err = embeddedetcd.NewServer(
+                       embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+                       embeddedetcd.RootDir(dir),
+               )
+               Expect(err).ShouldNot(HaveOccurred())
+               <-server.ReadyNotify()
+
+               // Start 5 data nodes
+               for i := 0; i < nodeCount; i++ {
+                       By(fmt.Sprintf("Starting data node %d", i))
+                       nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+                               "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s")
+                       // Update node ID to use 127.0.0.1
+                       _, nodePort, found := strings.Cut(nodeIDs[i], ":")
+                       Expect(found).To(BeTrue())
+                       nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort)
+               }
+
+               By("Starting liaison node")
+               _, liaisonHTTPAddr, closerLiaisonNode := 
setup.LiaisonNodeWithHTTP(ep)
+               addr = httpSchema + liaisonHTTPAddr
+
+               By("Creating test group with shard=1, copies=5")
+               defUITemplateWithSchema(rootCmd, addr, 1, nodeCount)
+
+               // Setup gossip messenger
+               messenger = 
gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999)
+               messenger.Validate()
+               err = messenger.PreRun(context.WithValue(context.Background(), 
common.ContextNodeKey, common.Node{
+                       NodeID: "test-client",
+               }))
+               Expect(err).NotTo(HaveOccurred())
+
+               for i := 0; i < nodeCount; i++ {
+                       registerNodeToMessenger(messenger, nodeIDs[i], 
nodeRepairAddrs[i])
+               }
+
+               deferFunc = func() {
+                       messenger.GracefulStop()
+                       closerLiaisonNode()
+                       for i := 0; i < nodeCount; i++ {
+                               if closeNodes[i] != nil {
+                                       closeNodes[i]()
+                               }
+                       }
+                       _ = server.Close()
+                       <-server.StopNotify()
+                       spaceDef()
+                       for i := 0; i < nodeCount; i++ {
+                               spaceDefs[i]()
+                       }
+               }
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+
+       It("should handle node failures and repairs correctly", func() {
+               By("Writing initial test data")
+               beforeFirstWrite := time.Now()
+               applyData(rootCmd, addr, p1YAML, true, propertyTagCount)
+
+               By("Verifying data can be queried initially")
+               queryData(rootCmd, addr, propertyGroup, property1ID, 1, 
func(data string, resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo111"))
+               })
+
+               By("Verifying repair tree regeneration after first write")
+               waitForRepairTreeRegeneration(nodeDirs, propertyGroup, 
beforeFirstWrite)
+
+               By(fmt.Sprintf("Closing %d nodes", closedNodeCount))
+               for i := 0; i < closedNodeCount; i++ {
+                       GinkgoWriter.Printf("Closing node %d\n", i)
+                       closeNodes[i]()
+                       closeNodes[i] = nil
+               }
+
+               By(fmt.Sprintf("Verifying data can still be queried after 
closing %d nodes", closedNodeCount))
+               queryData(rootCmd, addr, propertyGroup, property1ID, 1, 
func(data string, resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo111"))
+               })
+
+               By(fmt.Sprintf("Writing new test data with %d nodes down", 
closedNodeCount))
+               beforeSecondWrite := time.Now()
+               applyData(rootCmd, addr, p3YAML, true, propertyTagCount)
+
+               By("Verifying new data can be queried")
+               queryData(rootCmd, addr, propertyGroup, property2ID, 1, 
func(data string, resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo-mesh"))
+               })
+
+               By("Verifying repair tree regeneration on remaining nodes after 
second write")
+               
waitForRepairTreeRegeneration(nodeDirs[closedNodeCount:nodeCount], 
propertyGroup, beforeSecondWrite)
+
+               By(fmt.Sprintf("Restarting the %d closed nodes with existing 
data directories", closedNodeCount))
+               for i := 0; i < closedNodeCount; i++ {
+                       GinkgoWriter.Printf("Restarting node %d\n", i)
+                       nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(ep, nodeDirs[i],
+                               "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s")
+                       // Update node ID to use 127.0.0.1
+                       _, nodePort, found := strings.Cut(nodeIDs[i], ":")
+                       Expect(found).To(BeTrue())
+                       nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort)
+                       // Re-register to messenger
+                       registerNodeToMessenger(messenger, nodeIDs[i], 
nodeRepairAddrs[i])
+               }
+
+               By("Triggering repair operations")
+               err := messenger.Propagation(nodeIDs, propertyGroup, 0)
+               Expect(err).NotTo(HaveOccurred())
+
+               By("Verifying repair tree regeneration after repair operations")
+               waitForRepairTreeRegeneration(nodeDirs, propertyGroup, 
beforeSecondWrite)
+
+               By("Closing all nodes before checking data consistency")
+               for i := 0; i < nodeCount; i++ {
+                       if closeNodes[i] != nil {
+                               GinkgoWriter.Printf("Closing node %d for data 
consistency check\n", i)
+                               closeNodes[i]()
+                               closeNodes[i] = nil
+                       }
+               }
+
+               By("Verifying data consistency across all nodes after repair")
+               Eventually(func() bool {
+                       allNodesConsistent := true
+                       for i := 0; i < nodeCount; i++ {
+                               store, err := generateInvertedStore(nodeDirs[i])
+                               if err != nil {
+                                       GinkgoWriter.Printf("Node %d store 
error: %v\n", i, err)
+                                       allNodesConsistent = false
+                                       continue
+                               }
+                               query, err := 
inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+                                       Groups: []string{propertyGroup},
+                               }, "_group", "_entity_id")
+                               if err != nil {
+                                       GinkgoWriter.Printf("Node %d query 
build error: %v\n", i, err)
+                                       allNodesConsistent = false
+                                       continue
+                               }
+                               searchResult, err := 
store.Search(context.Background(), []index.FieldKey{sourceFieldKey, 
deletedFieldKey}, query, 10)
+                               if err != nil {
+                                       GinkgoWriter.Printf("Node %d search 
error: %v\n", i, err)
+                                       allNodesConsistent = false
+                                       continue
+                               }
+
+                               // Filter non-deleted properties
+                               nonDeletedCount := 0
+                               for _, result := range searchResult {
+                                       deleted := 
convert.BytesToBool(result.Fields[deletedFieldKey.TagName])
+                                       if !deleted {
+                                               nonDeletedCount++
+                                       }
+                               }
+
+                               GinkgoWriter.Printf("Node %d has %d total 
properties, %d non-deleted\n", i, len(searchResult), nonDeletedCount)
+                               if nonDeletedCount < 2 {
+                                       allNodesConsistent = false
+                               }
+                       }
+                       return allNodesConsistent
+               }, flags.EventuallyTimeout).Should(BeTrue())
+       })
+})
+
 func filterProperties(doc []index.SeriesDocument, filter func(property 
*propertyv1.Property, deleted bool) bool) (res []*propertyv1.Property) {
        for _, p := range doc {
                deleted := 
convert.BytesToBool(p.Fields[deletedFieldKey.TagName])
@@ -867,3 +1081,33 @@ func registerNodeToMessenger(m gossip.Messenger, nodeID, 
gossipRepairAddr string
                },
        })
 }
+
+func getRepairTreeFilePath(nodeDir, group string) string {
+       return path.Join(nodeDir, "property", "repairs", "shard0", 
fmt.Sprintf("state-tree-%s.data", group))
+}
+
+func getRepairTreeModTime(nodeDir, group string) (time.Time, error) {
+       filePath := getRepairTreeFilePath(nodeDir, group)
+       info, err := os.Stat(filePath)
+       if err != nil {
+               return time.Time{}, err
+       }
+       return info.ModTime(), nil
+}
+
+func waitForRepairTreeRegeneration(nodeDirs []string, group string, beforeTime 
time.Time) {
+       Eventually(func() bool {
+               allRegenerated := true
+               for _, nodeDir := range nodeDirs {
+                       modTime, err := getRepairTreeModTime(nodeDir, group)
+                       if err != nil {
+                               allRegenerated = false
+                               continue
+                       }
+                       if !modTime.After(beforeTime) {
+                               allRegenerated = false
+                       }
+               }
+               return allRegenerated
+       }, flags.EventuallyTimeout).Should(BeTrue(), "All nodes should 
regenerate repair tree after data write")
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2fb6bbd0..e7d778cc 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -258,9 +258,9 @@
   
 - [banyandb/property/v1/repair.proto](#banyandb_property_v1_repair-proto)
     - [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary)
-    - [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync)
     - [PropertyMissing](#banyandb-property-v1-PropertyMissing)
     - [PropertySync](#banyandb-property-v1-PropertySync)
+    - [PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom)
     - [RepairRequest](#banyandb-property-v1-RepairRequest)
     - [RepairResponse](#banyandb-property-v1-RepairResponse)
     - [RootCompare](#banyandb-property-v1-RootCompare)
@@ -268,6 +268,9 @@
     - [TreeRoot](#banyandb-property-v1-TreeRoot)
     - [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA)
     - [TreeSlots](#banyandb-property-v1-TreeSlots)
+    - [WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData)
+  
+    - [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType)
   
     - [RepairService](#banyandb-property-v1-RepairService)
   
@@ -3897,16 +3900,6 @@ Property stores the user defined data
 
 
 
-<a name="banyandb-property-v1-NoMorePropertySync"></a>
-
-### NoMorePropertySync
-
-
-
-
-
-
-
 <a name="banyandb-property-v1-PropertyMissing"></a>
 
 ### PropertyMissing
@@ -3939,6 +3932,22 @@ Property stores the user defined data
 
 
 
+<a name="banyandb-property-v1-PropertySyncWithFrom"></a>
+
+### PropertySyncWithFrom
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| from | [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType) |  
|  |
+| property | [PropertySync](#banyandb-property-v1-PropertySync) |  |  |
+
+
+
+
+
+
 <a name="banyandb-property-v1-RepairRequest"></a>
 
 ### RepairRequest
@@ -3951,7 +3960,7 @@ Property stores the user defined data
 | tree_slots | [TreeSlots](#banyandb-property-v1-TreeSlots) |  |  |
 | property_missing | [PropertyMissing](#banyandb-property-v1-PropertyMissing) 
|  | repair stage case 1: client missing but server existing |
 | property_sync | [PropertySync](#banyandb-property-v1-PropertySync) |  | case 
2: client existing but server missing case 3: SHA value mismatches |
-| no_more_property_sync | 
[NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) |  | if client 
side is already send all the properties(missing or property sync) which means 
the client side will not sending more properties to sync, server side should 
close the stream. |
+| wait_next_differ | 
[WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData) |  | wait next 
differ tree summary for process |
 
 
 
@@ -3968,7 +3977,7 @@ Property stores the user defined data
 | ----- | ---- | ----- | ----------- |
 | root_compare | [RootCompare](#banyandb-property-v1-RootCompare) |  | compare 
stage |
 | differ_tree_summary | 
[DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) |  |  |
-| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) |  | 
repair stage case 1: return from PropertyMissing case 3: return if the client 
is older |
+| property_sync | 
[PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom) |  | repair 
stage case 1: return from PropertyMissing case 3: return if the client is older 
|
 
 
 
@@ -4056,8 +4065,31 @@ Property stores the user defined data
 
 
 
+
+<a name="banyandb-property-v1-WaitNextDifferData"></a>
+
+### WaitNextDifferData
+
+
+
+
+
+
  
 
+
+<a name="banyandb-property-v1-PropertySyncFromType"></a>
+
+### PropertySyncFromType
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED | 0 |  |
+| PROPERTY_SYNC_FROM_TYPE_MISSING | 1 | client missing but server existing |
+| PROPERTY_SYNC_FROM_TYPE_SYNC | 2 | client existing but server missing or SHA 
value mismatches |
+
+
  
 
  
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 0488a9a7..d583fc0f 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -47,6 +47,7 @@ services:
       - 17912
       - 2121
       - 6060
+      - 17932
     command: data --etcd-endpoints=http://etcd:2379
     healthcheck:
       test: ["CMD", "./bydbctl", "health", "--addr=http://127.0.0.1:17913";]
diff --git a/test/property_repair/README.md b/test/property_repair/README.md
new file mode 100644
index 00000000..56415d43
--- /dev/null
+++ b/test/property_repair/README.md
@@ -0,0 +1,142 @@
+# Property Repair Benchmark Test
+
+## Purpose
+
+This performance evaluation is designed to test the execution efficiency of 
the backend’s automated Property repair feature. 
+It primarily covers the following key features:
+1. The cluster is configured with a maximum of three nodes, one group, one 
shard, and two replicas.
+2. Each shard contains **100,000** records, with each record approximately 
**2KB** in size.
+
+## Requirements
+
+- Docker and Docker Compose
+- Go 1.21+
+
+### Building the BanyanDB Docker Image
+
+Please make sure you have the latest version of the BanyanDB codebase, and 
building the Docker image is essential before running the tests.
+
+```bash
+export TARGET_OS=linux
+export PLATFORMS=linux/arm64 # please replace to your platform
+make clean && make generate && make release && make docker.build
+```
+
+## Monitoring
+
+The performance evaluation is primarily conducted by observing logs and 
monitoring metrics in Prometheus. 
+
+The logs provide clear markers for the start and end times of the backend 
repair process.
+In Prometheus, by visiting `http://localhost:9090`, you can view system 
performance metrics for each machine in the cluster.
+
+### CPU Usage Monitoring
+
+Use this PromQL query to monitor CPU usage during property repair:
+
+```promql
+avg by (instance) (
+  rate(process_cpu_seconds_total[1m]) * 100
+)
+```
+
+## Case 1: Fully Data Property Repair
+
+In the first test case, a brand-new, empty node will be started, 
+and **100,000** records will be synchronized to it in a single batch. 
+This test is designed to measure the node's CPU usage and the total time 
consumed during the process.
+
+### Running the Integrated Test
+
+The full data test case runs as an integrated test that handles all steps 
automatically:
+
+```bash
+cd test/property_repair/full_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 1 replica and loads 100,000 properties
+3. Updates the group to 2 replicas to trigger property repair
+4. Monitors the repair process through Prometheus metrics
+5. Verifies both propagation count and repair success count increase
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following 
information was recorded:
+1. **Duration**: The total estimated time taken was approximately **36 
minutes**.
+2. **CPU Consumption**: The estimated CPU usage on the new node was about 
**1.4 CPU cores**.
+
+The detailed CPU usage rate is shown in the figure below.
+
+![CPU Usage](full_data/cpu-usage.png)
+
+## Case 2: Half-Data Property Repair
+
+In the second test case, three nodes are started, with the group’s number of 
copies initially set to two. 
+First, **50,000** records are written to all three nodes.
+Next, the group’s copies' setting is changed to one, and the remaining 
**50,000** records are written to only two fixed nodes. 
+At this point, the third node’s dataset is missing half of the data compared 
to the other nodes.
+Finally, the group’s copies' setting is changed back to two, allowing the 
backend Property Repair process to perform the data synchronization 
automatically.
+
+### Running the Integrated Test
+
+This test case runs as an integrated test that handles all steps automatically:
+
+```bash
+cd test/property_repair/half_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 2 replicas and loads 50,000 properties
+3. Reduces the group to 1 replica
+4. Writes additional 50,000 properties (creating data inconsistency)
+5. Increases the group back to 2 replicas to trigger property repair
+6. Monitors the repair process through Prometheus metrics
+7. Verifies both propagation count and repair success count increase
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following 
information was recorded:
+1. **Duration**: The total estimated time taken was approximately **30 
minutes**.
+2. **CPU Consumption**: The estimated CPU usage on the new node was about 
**1.1 CPU cores**.
+
+The detailed CPU usage rate is shown in the figure below.
+
+![CPU Usage](half_data/cpu-usage.png)
+
+## Case 3: All Nodes Data are the Same
+
+In the third test case, which represents the most common scenario, all nodes 
contain identical data.
+
+### Running the Integrated Test
+
+This test case runs as an integrated test that handles all steps automatically:
+
+```bash
+cd test/property_repair/same_data
+# Run the complete integrated test
+go test . -v -timeout 3h -count=1
+```
+
+The test performs the following steps:
+1. Starts a 3-node cluster using docker-compose
+2. Creates a group with 2 replicas and loads 100,000 properties across all 
nodes
+3. Monitors the repair process through Prometheus metrics
+4. Verifies propagation count increases (repair count verification is skipped 
since data is consistent)
+
+Then, wait for the propagation to complete in the cluster.
+
+### Result
+
+After waiting for the Property Repair process to complete, the following 
information was recorded:
+1. **Duration**: Almost Less than **1 minute**.
+2. **CPU Consumption**: The estimated CPU usage in almost has no impact.
\ No newline at end of file
diff --git a/test/property_repair/base-compose.yml 
b/test/property_repair/base-compose.yml
new file mode 100644
index 00000000..112a48c5
--- /dev/null
+++ b/test/property_repair/base-compose.yml
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+
+x-data-base: &data_base
+  extends:
+    file: ../docker/base-compose.yml
+    service: data
+  image: apache/skywalking-banyandb:latest
+  depends_on:
+    etcd:
+      condition: service_healthy
+  cpus: 2
+  mem_limit: 4g
+  command: &base_data_cmd |
+    data --etcd-endpoints=http://etcd:2379
+         --observability-modes=prometheus
+         --observability-listener-addr=:2121
+         --property-repair-enabled=true
+         --property-repair-build-tree-cron="@every 30s"
+         --stream-root-path=/tmp/banyandb-data
+         --measure-root-path=/tmp/banyandb-data
+         --property-root-path=/tmp/banyandb-data
+
+# Base services for property repair tests
+services:
+  etcd:
+    extends:
+      file: ../docker/base-compose.yml
+      service: etcd
+    ports:
+      - "2379:2379"
+
+  liaison:
+    extends:
+      file: ../docker/base-compose.yml
+      service: liaison
+    image: apache/skywalking-banyandb:latest
+    depends_on:
+      etcd:
+        condition: service_healthy
+    cpus: 2
+    mem_limit: 3g
+    container_name: liaison
+    ports:
+      - "17912:17912"
+      - "17913:17913"
+      - "2121:2121"  # observability port for metrics
+    command: liaison --etcd-endpoints=http://etcd:2379 
--observability-modes=prometheus
+
+  # Data nodes with specific configurations
+  data-node-1:
+    <<: *data_base
+    hostname: data-node-1
+    container_name: data-node-1
+    ports:
+      - "2122:2121"  # observability port
+      - "6061:6060"  # pprof port
+    command: |
+      data --etcd-endpoints=http://etcd:2379
+           --observability-modes=prometheus
+           --observability-listener-addr=:2121
+           --property-repair-enabled=true
+           --property-repair-build-tree-cron="@every 30s"
+           --stream-root-path=/tmp/banyandb-data
+           --measure-root-path=/tmp/banyandb-data
+           --property-root-path=/tmp/banyandb-data
+           --property-repair-trigger-cron="*/10 * * * *"
+
+  data-node-2:
+    <<: *data_base
+    hostname: data-node-2
+    container_name: data-node-2
+    ports:
+      - "2123:2121"  # observability port
+      - "6062:6060"  # pprof port
+    command: *base_data_cmd
+
+  data-node-3:
+    <<: *data_base
+    hostname: data-node-3
+    container_name: data-node-3
+    ports:
+      - "2124:2121"  # observability port
+      - "6063:6060"  # pprof port
+    command: *base_data_cmd
+
+  # Prometheus for monitoring
+  prometheus:
+    image: prom/prometheus:latest
+    ports:
+      - "9090:9090"
+    command:
+      - '--config.file=/etc/prometheus/prometheus.yml'
+      - '--storage.tsdb.path=/prometheus'
+      - '--web.console.libraries=/etc/prometheus/console_libraries'
+      - '--web.console.templates=/etc/prometheus/consoles'
\ No newline at end of file
diff --git a/test/property_repair/full_data/cpu-usage.png 
b/test/property_repair/full_data/cpu-usage.png
new file mode 100644
index 00000000..54410cad
Binary files /dev/null and b/test/property_repair/full_data/cpu-usage.png differ
diff --git a/test/property_repair/full_data/docker-compose-3nodes.yml 
b/test/property_repair/full_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..20663b00
--- /dev/null
+++ b/test/property_repair/full_data/docker-compose-3nodes.yml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+services:
+  etcd:
+    extends:
+      file: ../base-compose.yml
+      service: etcd
+    networks:
+      - property-repair
+
+  liaison:
+    extends:
+      file: ../base-compose.yml
+      service: liaison
+    networks:
+      - property-repair
+
+  data-node-1:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-1
+    networks:
+      - property-repair
+
+  data-node-2:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-2
+    networks:
+      - property-repair
+
+  data-node-3:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-3
+    networks:
+      - property-repair
+
+  prometheus:
+    extends:
+      file: ../base-compose.yml
+      service: prometheus
+    networks:
+      - property-repair
+    volumes:
+      - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+  property-repair:
+    driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/full_data/integrated_test.go 
b/test/property_repair/full_data/integrated_test.go
new file mode 100644
index 00000000..1e4e3baa
--- /dev/null
+++ b/test/property_repair/full_data/integrated_test.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fulldata
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       propertyrepair 
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+       composeFile           string
+       conn                  *grpc.ClientConn
+       groupClient           databasev1.GroupRegistryServiceClient
+       propertyClient        databasev1.PropertyRegistryServiceClient
+       propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairIntegrated(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Property Repair Integrated Test Suite", 
ginkgo.Label("integration", "slow", "property_repair", "full_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+       fmt.Println("Starting Property Repair Integration Test Suite...")
+
+       // Disable Ryuk reaper to avoid container creation issues
+       os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+       // Set Docker host if needed (for local development)
+       if os.Getenv("DOCKER_HOST") == "" {
+               os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+       }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+       if conn != nil {
+               _ = conn.Close()
+       }
+       if composeFile != "" {
+               fmt.Println("Stopping compose stack...")
+               propertyrepair.ExecuteComposeCommand("-f", composeFile, "down")
+       }
+})
+
+var _ = ginkgo.Describe("Property Repair Full Data Test", ginkgo.Ordered, 
func() {
+       ginkgo.Describe("Step 1: Initial Data Load with 2 Nodes", func() {
+               ginkgo.It("Should start 3 data node cluster", func() {
+                       // Initialize compose stack with 2-node configuration
+                       var err error
+                       composeFile, err = 
filepath.Abs("docker-compose-3nodes.yml")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       fmt.Printf("Using compose file: %s\n", composeFile)
+
+                       // Start the docker compose stack without waiting first
+                       fmt.Println("Starting services...")
+                       err = propertyrepair.ExecuteComposeCommand("-f", 
composeFile, "up", "-d")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       // Simple wait for services to be ready
+                       time.Sleep(10 * time.Second)
+               })
+
+               ginkgo.It("Should connect to liaison and setup clients", func() 
{
+                       var err error
+                       fmt.Println("Connecting to Liaison server...")
+
+                       conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       fmt.Println("Connected to Liaison server successfully")
+
+                       groupClient = 
databasev1.NewGroupRegistryServiceClient(conn)
+                       propertyClient = 
databasev1.NewPropertyRegistryServiceClient(conn)
+                       propertyServiceClient = 
propertyv1.NewPropertyServiceClient(conn)
+               })
+
+               ginkgo.It("Should create group with 1 replica and write 100k 
properties", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("=== Step 1: Creating group with 1 replica 
and loading initial data ===")
+
+                       // Create group with 1 replica
+                       propertyrepair.CreateGroup(ctx, groupClient, 1)
+
+                       // Create property schema
+                       propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+                       // Write 100,000 properties
+                       fmt.Println("Starting to write 100,000 properties...")
+                       startTime := time.Now()
+
+                       err := propertyrepair.WriteProperties(ctx, 
propertyServiceClient, 0, 100000)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 1 completed: wrote 100,000 
properties in %v ===\n", duration)
+               })
+       })
+
+       ginkgo.Describe("Step 2: Add 3rd Node and Update Replicas", func() {
+               ginkgo.It("Should update group replicas to 2", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("Updating group replicas from 1 to 2...")
+                       startTime := time.Now()
+
+                       // Update group replicas to 2
+                       propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2)
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 2 completed: updated replicas to 2 
in %v ===\n", duration)
+               })
+       })
+
+       ginkgo.Describe("Verification", func() {
+               ginkgo.It("Should verify the property repair completed and 
prometheus metrics", func() {
+                       fmt.Println("=== Verification: Property repair process 
and prometheus metrics ===")
+
+                       // Get initial metrics from all data nodes
+                       fmt.Println("Reading initial prometheus metrics from 
all data nodes...")
+                       beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+                       // Print initial metrics state
+                       fmt.Println("Initial metrics state:")
+                       for _, metrics := range beforeMetrics {
+                               
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                       fmt.Sprintf("Node %s should be healthy 
before verification: %s",
+                                               metrics.NodeName, 
metrics.ErrorMessage))
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                       }
+
+                       fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
+                       fmt.Println("Waiting for property repair to trigger 
(@every 10 minutes)...")
+
+                       gomega.Eventually(func() bool {
+                               time.Sleep(time.Second * 30)
+                               // Get metrics after repair
+                               fmt.Println("Trying to reading prometheus 
metrics to check repair status...")
+                               afterMetrics := 
propertyrepair.GetAllNodeMetrics()
+                               
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+                               // Check all node health, no crash
+                               for _, metrics := range afterMetrics {
+                                       
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                               fmt.Sprintf("Node %s should be 
healthy after repair: %s",
+                                                       metrics.NodeName, 
metrics.ErrorMessage))
+                               }
+
+                               // check the property repair progress finished, 
and the property must be repaired (at last one success)
+                               isAnyRepairFinished := false
+                               for i, before := range beforeMetrics {
+                                       after := afterMetrics[i]
+                                       if before.TotalPropagationCount < 
after.TotalPropagationCount &&
+                                               before.RepairSuccessCount < 
after.RepairSuccessCount {
+                                               isAnyRepairFinished = true
+                                       }
+                               }
+                               return isAnyRepairFinished
+                       }, time.Hour*2).Should(gomega.BeTrue(), "Property 
repair cycle should complete within the expected time")
+               })
+       })
+})
diff --git a/test/property_repair/half_data/cpu-usage.png 
b/test/property_repair/half_data/cpu-usage.png
new file mode 100644
index 00000000..fd367531
Binary files /dev/null and b/test/property_repair/half_data/cpu-usage.png differ
diff --git a/test/property_repair/half_data/docker-compose-3nodes.yml 
b/test/property_repair/half_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..ce2a2070
--- /dev/null
+++ b/test/property_repair/half_data/docker-compose-3nodes.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+volumes:
+  node1-data: {}
+  node2-data: {}
+  node3-data: {}
+
+services:
+  etcd:
+    extends:
+      file: ../base-compose.yml
+      service: etcd
+    networks:
+      - half-data
+
+  liaison:
+    extends:
+      file: ../base-compose.yml
+      service: liaison
+    networks:
+      - half-data
+
+  data-node-1:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-1
+    volumes:
+      - node1-data:/tmp/banyandb-data
+    networks:
+      - half-data
+
+  data-node-2:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-2
+    volumes:
+      - node2-data:/tmp/banyandb-data
+    networks:
+      - half-data
+
+  data-node-3:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-3
+    volumes:
+      - node3-data:/tmp/banyandb-data
+    networks:
+      - half-data
+
+  prometheus:
+    extends:
+      file: ../base-compose.yml
+      service: prometheus
+    networks:
+      - half-data
+    volumes:
+      - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+  half-data:
+    driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/half_data/integrated_test.go 
b/test/property_repair/half_data/integrated_test.go
new file mode 100644
index 00000000..119d0b49
--- /dev/null
+++ b/test/property_repair/half_data/integrated_test.go
@@ -0,0 +1,219 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package halfdata
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       propertyrepair 
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+       composeFile           string
+       conn                  *grpc.ClientConn
+       groupClient           databasev1.GroupRegistryServiceClient
+       propertyClient        databasev1.PropertyRegistryServiceClient
+       propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairHalfData(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Property Repair Half Data Test Suite", 
ginkgo.Label("integration", "slow", "property_repair", "half_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+       fmt.Println("Starting Property Repair Half Data Integration Test 
Suite...")
+
+       // Disable Ryuk reaper to avoid container creation issues
+       os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+       // Set Docker host if needed (for local development)
+       if os.Getenv("DOCKER_HOST") == "" {
+               os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+       }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+       if conn != nil {
+               _ = conn.Close()
+       }
+       if composeFile != "" {
+               fmt.Println("Stopping compose stack...")
+               _ = propertyrepair.ExecuteComposeCommand(composeFile, "down")
+       }
+})
+
+var _ = ginkgo.Describe("Property Repair Half Data Test", ginkgo.Ordered, 
func() {
+       ginkgo.Describe("Step 1: Initial Setup and Data Load", func() {
+               ginkgo.It("Should start 3 data node cluster", func() {
+                       // Initialize compose stack with 3-node configuration
+                       var err error
+                       composeFile, err = 
filepath.Abs("docker-compose-3nodes.yml")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       fmt.Printf("Using compose file: %s\n", composeFile)
+
+                       // Start the docker compose stack without waiting first
+                       fmt.Println("Starting services...")
+                       err = propertyrepair.ExecuteComposeCommand("-f", 
composeFile, "up", "-d")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       // Simple wait for services to be ready
+                       time.Sleep(10 * time.Second)
+               })
+
+               ginkgo.It("Should connect to liaison and setup clients", func() 
{
+                       var err error
+                       fmt.Println("Connecting to Liaison server...")
+
+                       conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       fmt.Println("Connected to Liaison server successfully")
+
+                       groupClient = 
databasev1.NewGroupRegistryServiceClient(conn)
+                       propertyClient = 
databasev1.NewPropertyRegistryServiceClient(conn)
+                       propertyServiceClient = 
propertyv1.NewPropertyServiceClient(conn)
+               })
+
+               ginkgo.It("Should create group with 2 replicas and write 50k 
properties", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("=== Step 1: Creating group with 2 replicas 
and loading initial data ===")
+
+                       // Create group with 2 replicas
+                       propertyrepair.CreateGroup(ctx, groupClient, 2)
+
+                       // Create property schema
+                       propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+                       // Write 50,000 properties
+                       fmt.Println("Starting to write 50,000 properties...")
+                       startTime := time.Now()
+
+                       err := propertyrepair.WriteProperties(ctx, 
propertyServiceClient, 0, 50000)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 1 completed: wrote 50,000 
properties in %v ===\n", duration)
+               })
+       })
+
+       ginkgo.Describe("Step 2: Reduce Replicas and Add More Data", func() {
+               ginkgo.It("Should reduce group replicas to 1", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("Reducing group replicas from 2 to 1...")
+                       startTime := time.Now()
+
+                       // Update group replicas to 1
+                       propertyrepair.UpdateGroupReplicas(ctx, groupClient, 1)
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 2 completed: updated replicas to 1 
in %v ===\n", duration)
+               })
+
+               ginkgo.It("Should write additional 50k properties", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("=== Step 3: Writing additional properties 
===")
+                       startTime := time.Now()
+
+                       // Write another 50,000 properties (50000-100000)
+                       err := propertyrepair.WriteProperties(ctx, 
propertyServiceClient, 50000, 100000)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 3 completed: wrote additional 
50,000 properties in %v ===\n", duration)
+               })
+
+               ginkgo.It("Should increase group replicas back to 2", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("Increasing group replicas from 1 to 2...")
+                       startTime := time.Now()
+
+                       // Update group replicas to 2
+                       propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2)
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 4 completed: updated replicas to 2 
in %v ===\n", duration)
+               })
+       })
+
+       ginkgo.Describe("Verification", func() {
+               ginkgo.It("Should verify the property repair completed and 
prometheus metrics", func() {
+                       fmt.Println("=== Verification: Property repair process 
and prometheus metrics ===")
+
+                       // Get initial metrics from all data nodes
+                       fmt.Println("Reading initial prometheus metrics from 
all data nodes...")
+                       beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+                       // Print initial metrics state
+                       fmt.Println("Initial metrics state:")
+                       for _, metrics := range beforeMetrics {
+                               
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                       fmt.Sprintf("Node %s should be healthy 
before verification: %s",
+                                               metrics.NodeName, 
metrics.ErrorMessage))
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                       }
+
+                       fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
+                       fmt.Println("Waiting for property repair to trigger 
(@every 10 minutes)...")
+
+                       gomega.Eventually(func() bool {
+                               time.Sleep(time.Second * 30)
+                               // Get metrics after repair
+                               fmt.Println("Trying to reading prometheus 
metrics to check repair status...")
+                               afterMetrics := 
propertyrepair.GetAllNodeMetrics()
+                               
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+                               // Check all node health, no crash
+                               for _, metrics := range afterMetrics {
+                                       
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                               fmt.Sprintf("Node %s should be 
healthy after repair: %s",
+                                                       metrics.NodeName, 
metrics.ErrorMessage))
+                               }
+
+                               // check the property repair progress finished, 
and the property must be repaired (at last one success)
+                               isAnyRepairFinished := false
+                               for i, before := range beforeMetrics {
+                                       after := afterMetrics[i]
+                                       if before.TotalPropagationCount < 
after.TotalPropagationCount &&
+                                               before.RepairSuccessCount < 
after.RepairSuccessCount {
+                                               isAnyRepairFinished = true
+                                       }
+                               }
+                               return isAnyRepairFinished
+                       }, time.Hour*2).Should(gomega.BeTrue(), "Property 
repair cycle should complete within the expected time")
+               })
+       })
+})
diff --git a/test/property_repair/prometheus-3nodes.yml 
b/test/property_repair/prometheus-3nodes.yml
new file mode 100644
index 00000000..23968dfc
--- /dev/null
+++ b/test/property_repair/prometheus-3nodes.yml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+global:
+  scrape_interval: 15s
+  evaluation_interval: 15s
+
+scrape_configs:
+  - job_name: 'liaison'
+    static_configs:
+      - targets: ['liaison:2121']
+    scrape_interval: 5s
+    metrics_path: '/metrics'
+    
+  - job_name: 'data-node-1'
+    static_configs:
+      - targets: ['data-node-1:2121']
+    scrape_interval: 5s
+    metrics_path: '/metrics'
+    
+  - job_name: 'data-node-2'
+    static_configs:
+      - targets: ['data-node-2:2121']
+    scrape_interval: 5s
+    metrics_path: '/metrics'
+    
+  - job_name: 'data-node-3'
+    static_configs:
+      - targets: ['data-node-3:2121']
+    scrape_interval: 5s
+    metrics_path: '/metrics'
\ No newline at end of file
diff --git a/test/property_repair/same_data/docker-compose-3nodes.yml 
b/test/property_repair/same_data/docker-compose-3nodes.yml
new file mode 100644
index 00000000..e0ef9802
--- /dev/null
+++ b/test/property_repair/same_data/docker-compose-3nodes.yml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+services:
+  etcd:
+    extends:
+      file: ../base-compose.yml
+      service: etcd
+    networks:
+      - no-data
+
+  liaison:
+    extends:
+      file: ../base-compose.yml
+      service: liaison
+    networks:
+      - no-data
+
+  data-node-1:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-1
+    networks:
+      - no-data
+
+  data-node-2:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-2
+    networks:
+      - no-data
+
+  data-node-3:
+    extends:
+      file: ../base-compose.yml
+      service: data-node-3
+    networks:
+      - no-data
+
+  prometheus:
+    extends:
+      file: ../base-compose.yml
+      service: prometheus
+    networks:
+      - no-data
+    volumes:
+      - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml
+
+networks:
+  no-data:
+    driver: bridge
\ No newline at end of file
diff --git a/test/property_repair/same_data/integrated_test.go 
b/test/property_repair/same_data/integrated_test.go
new file mode 100644
index 00000000..e4674c8c
--- /dev/null
+++ b/test/property_repair/same_data/integrated_test.go
@@ -0,0 +1,178 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package samedata
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       propertyrepair 
"github.com/apache/skywalking-banyandb/test/property_repair"
+)
+
+var (
+       composeFile           string
+       conn                  *grpc.ClientConn
+       groupClient           databasev1.GroupRegistryServiceClient
+       propertyClient        databasev1.PropertyRegistryServiceClient
+       propertyServiceClient propertyv1.PropertyServiceClient
+)
+
+func TestPropertyRepairSameData(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Property Repair Same Data Test Suite", 
ginkgo.Label("integration", "slow", "property_repair", "same_data"))
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+       fmt.Println("Starting Property Repair Same Data Integration Test 
Suite...")
+
+       // Disable Ryuk reaper to avoid container creation issues
+       os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")
+
+       // Set Docker host if needed (for local development)
+       if os.Getenv("DOCKER_HOST") == "" {
+               os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock")
+       }
+})
+
+var _ = ginkgo.AfterSuite(func() {
+       if conn != nil {
+               _ = conn.Close()
+       }
+       if composeFile != "" {
+               fmt.Println("Stopping compose stack...")
+               propertyrepair.ExecuteComposeCommand(composeFile, "down")
+       }
+})
+
+var _ = ginkgo.Describe("Property Repair Same Data Test", ginkgo.Ordered, 
func() {
+       ginkgo.Describe("Step 1: Initial Data Load", func() {
+               ginkgo.It("Should start 3 data node cluster", func() {
+                       // Initialize compose stack with 3-node configuration
+                       var err error
+                       composeFile, err = 
filepath.Abs("docker-compose-3nodes.yml")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       fmt.Printf("Using compose file: %s\n", composeFile)
+
+                       // Start the docker compose stack without waiting first
+                       fmt.Println("Starting services...")
+                       err = propertyrepair.ExecuteComposeCommand("-f", 
composeFile, "up", "-d")
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       // Simple wait for services to be ready
+                       time.Sleep(10 * time.Second)
+               })
+
+               ginkgo.It("Should connect to liaison and setup clients", func() 
{
+                       var err error
+                       fmt.Println("Connecting to Liaison server...")
+
+                       conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 
30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials()))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       fmt.Println("Connected to Liaison server successfully")
+
+                       groupClient = 
databasev1.NewGroupRegistryServiceClient(conn)
+                       propertyClient = 
databasev1.NewPropertyRegistryServiceClient(conn)
+                       propertyServiceClient = 
propertyv1.NewPropertyServiceClient(conn)
+               })
+
+               ginkgo.It("Should create group with 2 replicas and write 100k 
properties", func() {
+                       ctx := context.Background()
+
+                       fmt.Println("=== Step 1: Creating group with 2 replicas 
and loading data ===")
+
+                       // Create group with 2 replicas
+                       propertyrepair.CreateGroup(ctx, groupClient, 2)
+
+                       // Create property schema
+                       propertyrepair.CreatePropertySchema(ctx, propertyClient)
+
+                       // Write 100,000 properties (same amount across all 
replicas)
+                       fmt.Println("Starting to write 100,000 properties...")
+                       startTime := time.Now()
+
+                       err := propertyrepair.WriteProperties(ctx, 
propertyServiceClient, 0, 100000)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       duration := time.Since(startTime)
+                       fmt.Printf("=== Step 1 completed: wrote 100,000 
properties in %v ===\n", duration)
+               })
+       })
+
+       ginkgo.Describe("Verification", func() {
+               ginkgo.It("Should verify the property repair completed and 
prometheus metrics", func() {
+                       fmt.Println("=== Verification: Property repair process 
and prometheus metrics ===")
+
+                       // Get initial metrics from all data nodes
+                       fmt.Println("Reading initial prometheus metrics from 
all data nodes...")
+                       beforeMetrics := propertyrepair.GetAllNodeMetrics()
+
+                       // Print initial metrics state
+                       fmt.Println("Initial metrics state:")
+                       for _, metrics := range beforeMetrics {
+                               
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                       fmt.Sprintf("Node %s should be healthy 
before verification: %s",
+                                               metrics.NodeName, 
metrics.ErrorMessage))
+                               fmt.Printf("- %s: total_propagation_count=%d, 
repair_success_count=%d\n",
+                                       metrics.NodeName, 
metrics.TotalPropagationCount, metrics.RepairSuccessCount)
+                       }
+
+                       fmt.Println("\n=== Triggering property repair by 
waiting for scheduled repair cycle ===")
+                       fmt.Println("Waiting for property repair to trigger 
(@every 10 minutes)...")
+
+                       gomega.Eventually(func() bool {
+                               time.Sleep(time.Second * 30)
+                               // Get metrics after repair
+                               fmt.Println("Trying to reading prometheus 
metrics to check repair status...")
+                               afterMetrics := 
propertyrepair.GetAllNodeMetrics()
+                               
propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics)
+
+                               // Check all node health, no crash
+                               for _, metrics := range afterMetrics {
+                                       
gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(),
+                                               fmt.Sprintf("Node %s should be 
healthy after repair: %s",
+                                                       metrics.NodeName, 
metrics.ErrorMessage))
+                               }
+
+                               // For same data scenario, only verify 
propagation count increased (not repair count)
+                               // Since data is consistent, repairs may not be 
needed but propagation should still occur
+                               isPropagationActive := false
+                               for i, before := range beforeMetrics {
+                                       after := afterMetrics[i]
+                                       if before.TotalPropagationCount < 
after.TotalPropagationCount {
+                                               isPropagationActive = true
+                                               break
+                                       }
+                               }
+                               return isPropagationActive
+                       }, time.Hour*2).Should(gomega.BeTrue(), "Property 
propagation should be active even with consistent data")
+               })
+       })
+})
diff --git a/test/property_repair/shared_utils.go 
b/test/property_repair/shared_utils.go
new file mode 100644
index 00000000..12c9cebd
--- /dev/null
+++ b/test/property_repair/shared_utils.go
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package propertyrepair package provides utilities for property repair 
performance testing in BanyanDB.
+package propertyrepair
+
+import (
+       "context"
+       "crypto/rand"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "os/exec"
+       "regexp"
+       "strconv"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+)
+
+// Constants for property repair performance testing.
+const (
+       DataSize     = 2048 // 2KB per property
+       LiaisonAddr  = "localhost:17912"
+       Concurrency  = 6
+       GroupName    = "perf-test-group"
+       PropertyName = "perf-test-property"
+)
+
+// PrometheusEndpoints defines the prometheus endpoints for data nodes.
+var PrometheusEndpoints = []string{
+       "http://localhost:2122/metrics";, // data-node-1
+       "http://localhost:2123/metrics";, // data-node-2
+       "http://localhost:2124/metrics";, // data-node-3
+}
+
+// NodeMetrics represents the metrics for a data node.
+type NodeMetrics struct {
+       LastScrapeTime        time.Time
+       NodeName              string
+       ErrorMessage          string
+       TotalPropagationCount int64
+       RepairSuccessCount    int64
+       IsHealthy             bool
+}
+
+// GenerateLargeData creates a string of specified size filled with random 
characters.
+func GenerateLargeData(size int) string {
+       const charset = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+
+       // Generate some random bytes
+       randomBytes := make([]byte, 32)
+       _, err := rand.Read(randomBytes)
+       if err != nil {
+               // Fallback to timestamp-based data
+               baseData := fmt.Sprintf("timestamp-%d-", time.Now().UnixNano())
+               repeats := size / len(baseData)
+               if repeats == 0 {
+                       repeats = 1
+               }
+               return strings.Repeat(baseData, repeats)[:size]
+       }
+
+       // Create base string from random bytes
+       var baseBuilder strings.Builder
+       for _, b := range randomBytes {
+               baseBuilder.WriteByte(charset[b%byte(len(charset))])
+       }
+       baseData := baseBuilder.String()
+
+       // Repeat to reach desired size
+       repeats := (size / len(baseData)) + 1
+       result := strings.Repeat(baseData, repeats)
+
+       if len(result) > size {
+               return result[:size]
+       }
+       return result
+}
+
+// FormatDuration formats a duration to a human-readable string.
+func FormatDuration(duration time.Duration) string {
+       if duration < time.Second {
+               return fmt.Sprintf("%dms", duration.Milliseconds())
+       }
+       if duration < time.Minute {
+               return fmt.Sprintf("%.1fs", duration.Seconds())
+       }
+       return fmt.Sprintf("%.1fm", duration.Minutes())
+}
+
+// FormatThroughput calculates and formats throughput.
+func FormatThroughput(count int64, duration time.Duration) string {
+       if duration == 0 {
+               return "N/A"
+       }
+       throughput := float64(count) / duration.Seconds()
+       return fmt.Sprintf("%.1f/s", throughput)
+}
+
+// CreateGroup creates a property group with specified parameters.
+func CreateGroup(ctx context.Context, groupClient 
databasev1.GroupRegistryServiceClient, replicaNum uint32) {
+       fmt.Printf("Creating group %s with %d replicas...\n", GroupName, 
replicaNum)
+       _, err := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+               Group: &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: GroupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 1,
+                               Replicas: replicaNum,
+                       },
+               },
+       })
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// UpdateGroupReplicas updates the replica number of an existing group.
+func UpdateGroupReplicas(ctx context.Context, groupClient 
databasev1.GroupRegistryServiceClient, newReplicaNum uint32) {
+       fmt.Printf("Updating group %s to %d replicas...\n", GroupName, 
newReplicaNum)
+       _, err := groupClient.Update(ctx, 
&databasev1.GroupRegistryServiceUpdateRequest{
+               Group: &commonv1.Group{
+                       Metadata: &commonv1.Metadata{
+                               Name: GroupName,
+                       },
+                       Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 1,
+                               Replicas: newReplicaNum,
+                       },
+               },
+       })
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// CreatePropertySchema creates a property schema.
+func CreatePropertySchema(ctx context.Context, propertyClient 
databasev1.PropertyRegistryServiceClient) {
+       fmt.Printf("Creating property schema %s...\n", PropertyName)
+       _, err := propertyClient.Create(ctx, 
&databasev1.PropertyRegistryServiceCreateRequest{
+               Property: &databasev1.Property{
+                       Metadata: &commonv1.Metadata{
+                               Name:  PropertyName,
+                               Group: GroupName,
+                       },
+                       Tags: []*databasev1.TagSpec{
+                               {Name: "data", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               {Name: "timestamp", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                       },
+               },
+       })
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+}
+
+// WriteProperties writes a batch of properties concurrently.
+func WriteProperties(ctx context.Context, propertyServiceClient 
propertyv1.PropertyServiceClient,
+       startIdx int, endIdx int,
+) error {
+       fmt.Printf("Starting to write %d-%d properties using %d 
goroutines...\n",
+               startIdx, endIdx, Concurrency)
+
+       startTime := time.Now()
+
+       // Channel to generate property data
+       dataChannel := make(chan int, 1000) // Buffer for property indices
+       var wg sync.WaitGroup
+       var totalProcessed int64
+
+       // Start data producer goroutine
+       go func() {
+               defer close(dataChannel)
+               for i := startIdx; i < endIdx; i++ {
+                       dataChannel <- i
+               }
+       }()
+
+       // Start consumer goroutines
+       for i := 0; i < Concurrency; i++ {
+               wg.Add(1)
+               go func(workerID int) {
+                       defer ginkgo.GinkgoRecover()
+                       defer wg.Done()
+                       var count int64
+
+                       for propertyIndex := range dataChannel {
+                               propertyID := fmt.Sprintf("property-%d", 
propertyIndex)
+                               largeData := GenerateLargeData(DataSize)
+                               timestamp := time.Now().Format(time.RFC3339Nano)
+
+                               _, writeErr := propertyServiceClient.Apply(ctx, 
&propertyv1.ApplyRequest{
+                                       Property: &propertyv1.Property{
+                                               Metadata: &commonv1.Metadata{
+                                                       Name:  PropertyName,
+                                                       Group: GroupName,
+                                               },
+                                               Id: propertyID,
+                                               Tags: []*modelv1.Tag{
+                                                       {Key: "data", Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
largeData}}}},
+                                                       {Key: "timestamp", 
Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
timestamp}}}},
+                                               },
+                                       },
+                               })
+                               
gomega.Expect(writeErr).NotTo(gomega.HaveOccurred())
+
+                               count++
+                               atomic.AddInt64(&totalProcessed, 1)
+
+                               if atomic.LoadInt64(&totalProcessed)%500 == 0 {
+                                       elapsed := time.Since(startTime)
+                                       totalCount := 
atomic.LoadInt64(&totalProcessed)
+                                       fmt.Printf("total processed: %d, use: 
%v\n", totalCount, elapsed)
+                               }
+                       }
+
+                       fmt.Printf("Worker %d completed: processed %d 
properties total\n", workerID, count)
+               }(i)
+       }
+
+       wg.Wait()
+       endTime := time.Now()
+       duration := endTime.Sub(startTime)
+       fmt.Printf("Write completed: %d properties in %s (%s props/sec)\n",
+               endIdx, FormatDuration(duration), 
FormatThroughput(int64(endIdx), duration))
+       return nil
+}
+
+// GetNodeMetrics fetches prometheus metrics from a single data node endpoint.
+func GetNodeMetrics(endpoint string, nodeIndex int) *NodeMetrics {
+       nodeName := fmt.Sprintf("data-node-%d", nodeIndex+1)
+       metrics := &NodeMetrics{
+               NodeName:       nodeName,
+               LastScrapeTime: time.Now(),
+               IsHealthy:      false,
+       }
+
+       // Set timeout for HTTP request
+       client := &http.Client{
+               Timeout: 10 * time.Second,
+       }
+
+       resp, err := client.Get(endpoint)
+       if err != nil {
+               metrics.ErrorMessage = fmt.Sprintf("Failed to connect to %s: 
%v", endpoint, err)
+               return metrics
+       }
+       defer resp.Body.Close()
+
+       if resp.StatusCode != http.StatusOK {
+               metrics.ErrorMessage = fmt.Sprintf("HTTP error %d from %s", 
resp.StatusCode, endpoint)
+               return metrics
+       }
+
+       body, err := io.ReadAll(resp.Body)
+       if err != nil {
+               metrics.ErrorMessage = fmt.Sprintf("Failed to read response 
from %s: %v", endpoint, err)
+               return metrics
+       }
+
+       // Parse metrics from prometheus data
+       content := string(body)
+       totalPropagationCount := parseTotalPropagationCount(content)
+       repairSuccessCount := parseRepairSuccessCount(content)
+
+       metrics.TotalPropagationCount = totalPropagationCount
+       metrics.RepairSuccessCount = repairSuccessCount
+       metrics.IsHealthy = true
+       return metrics
+}
+
+// parseTotalPropagationCount parses the total_propagation_count from 
prometheus metrics text.
+func parseTotalPropagationCount(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"}
 3
+       re := 
regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// parseRepairSuccessCount parses the repair_success_count from prometheus 
metrics text.
+func parseRepairSuccessCount(content string) int64 {
+       // Look for metric lines like: 
banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"}
 100
+       re := 
regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`)
+       matches := re.FindAllStringSubmatch(content, -1)
+
+       var totalCount int64
+       for _, match := range matches {
+               if len(match) >= 2 {
+                       value, err := strconv.ParseFloat(match[1], 64)
+                       if err != nil {
+                               continue
+                       }
+                       totalCount += int64(value)
+               }
+       }
+
+       return totalCount
+}
+
+// GetAllNodeMetrics fetches metrics from all data nodes concurrently.
+func GetAllNodeMetrics() []*NodeMetrics {
+       var wg sync.WaitGroup
+       metrics := make([]*NodeMetrics, len(PrometheusEndpoints))
+
+       for i, endpoint := range PrometheusEndpoints {
+               wg.Add(1)
+               go func(index int, url string) {
+                       defer wg.Done()
+                       metrics[index] = GetNodeMetrics(url, index)
+               }(i, endpoint)
+       }
+
+       wg.Wait()
+       return metrics
+}
+
+// VerifyPropagationCountIncreased compares metrics before and after to verify 
total_propagation_count increased by exactly 1.
+func VerifyPropagationCountIncreased(beforeMetrics, afterMetrics 
[]*NodeMetrics) error {
+       if len(beforeMetrics) != len(afterMetrics) {
+               return fmt.Errorf("metrics array length mismatch: before=%d, 
after=%d", len(beforeMetrics), len(afterMetrics))
+       }
+
+       for i, after := range afterMetrics {
+               before := beforeMetrics[i]
+
+               if !after.IsHealthy {
+                       return fmt.Errorf("node %s is not healthy after update: 
%s", after.NodeName, after.ErrorMessage)
+               }
+
+               if !before.IsHealthy {
+                       return fmt.Errorf("node %s was not healthy before 
update: %s", before.NodeName, before.ErrorMessage)
+               }
+
+               expectedCount := before.TotalPropagationCount + 1
+               if after.TotalPropagationCount != expectedCount {
+                       return fmt.Errorf("node %s propagation count mismatch: 
expected=%d, actual=%d (before=%d)",
+                               after.NodeName, expectedCount, 
after.TotalPropagationCount, before.TotalPropagationCount)
+               }
+       }
+
+       return nil
+}
+
+// PrintMetricsComparison prints a comparison of metrics before and after.
+func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) {
+       fmt.Println("=== Prometheus Metrics Comparison ===")
+       fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation 
Count", "Repair Success Count", "Healthy")
+       fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "", 
"Before", "After", "Delta", "Before", "After", "Delta", "")
+       fmt.Println(strings.Repeat("-", 85))
+
+       for i, after := range afterMetrics {
+               if i < len(beforeMetrics) {
+                       before := beforeMetrics[i]
+                       propagationDelta := after.TotalPropagationCount - 
before.TotalPropagationCount
+                       repairDelta := after.RepairSuccessCount - 
before.RepairSuccessCount
+                       healthStatus := "✓"
+                       if !after.IsHealthy {
+                               healthStatus = "✗"
+                       }
+
+                       fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | 
%-7s\n",
+                               after.NodeName,
+                               before.TotalPropagationCount, 
after.TotalPropagationCount, propagationDelta,
+                               before.RepairSuccessCount, 
after.RepairSuccessCount, repairDelta,
+                               healthStatus)
+               }
+       }
+       fmt.Println()
+}
+
+// ExecuteComposeCommand executes a docker-compose command, supporting both v1 
and v2.
+func ExecuteComposeCommand(args ...string) error {
+       // v2
+       if _, err := exec.LookPath("docker"); err == nil {
+               check := exec.Command("docker", "compose", "version")
+               if out, err := check.CombinedOutput(); err == nil && 
strings.Contains(string(out), "Docker Compose") {
+                       composeArgs := append([]string{"compose"}, args...)
+                       cmd := exec.Command("docker", composeArgs...)
+                       cmd.Stdout = os.Stdout
+                       cmd.Stderr = os.Stderr
+                       return cmd.Run()
+               }
+       }
+
+       // v1
+       if _, err := exec.LookPath("docker-compose"); err == nil {
+               cmd := exec.Command("docker-compose", args...)
+               cmd.Stdout = os.Stdout
+               cmd.Stderr = os.Stderr
+               return cmd.Run()
+       }
+
+       return nil
+}

Reply via email to