This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch dbsl in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 04d5ffb6b5b019651fc97efa3bd61d8c6f605af1 Author: mrproliu <741550...@qq.com> AuthorDate: Tue Aug 26 22:15:37 2025 +0700 Add benchmark and fix some issue of background property repair (#725) --- .github/workflows/slow-test.yml | 7 + api/proto/banyandb/property/v1/repair.proto | 20 +- banyand/liaison/grpc/property.go | 9 +- banyand/property/gossip/client.go | 15 +- banyand/property/gossip/server.go | 9 +- banyand/property/gossip/service.go | 2 + banyand/property/gossip/trace.go | 12 +- banyand/property/repair.go | 224 +++++++---- banyand/property/repair_gossip.go | 185 +++++---- bydbctl/internal/cmd/property_test.go | 244 ++++++++++++ docs/api-reference.md | 58 ++- test/docker/base-compose.yml | 1 + test/property_repair/README.md | 142 +++++++ test/property_repair/base-compose.yml | 111 ++++++ test/property_repair/full_data/cpu-usage.png | Bin 0 -> 60836 bytes .../full_data/docker-compose-3nodes.yml | 65 ++++ test/property_repair/full_data/integrated_test.go | 192 +++++++++ test/property_repair/half_data/cpu-usage.png | Bin 0 -> 54254 bytes .../half_data/docker-compose-3nodes.yml | 76 ++++ test/property_repair/half_data/integrated_test.go | 219 +++++++++++ test/property_repair/prometheus-3nodes.yml | 43 +++ .../same_data/docker-compose-3nodes.yml | 65 ++++ test/property_repair/same_data/integrated_test.go | 178 +++++++++ test/property_repair/shared_utils.go | 428 +++++++++++++++++++++ 24 files changed, 2144 insertions(+), 161 deletions(-) diff --git a/.github/workflows/slow-test.yml b/.github/workflows/slow-test.yml index c62496d3..8974c19d 100644 --- a/.github/workflows/slow-test.yml +++ b/.github/workflows/slow-test.yml @@ -31,3 +31,10 @@ jobs: with: options: --label-filter slow timeout-minutes: 120 + + property-repair: + if: github.repository == 'apache/skywalking-banyandb' + uses: ./.github/workflows/test.yml + with: + options: --label-filter property_repair + timeout-minutes: 120 diff --git a/api/proto/banyandb/property/v1/repair.proto b/api/proto/banyandb/property/v1/repair.proto index 24510ed9..7c158598 100644 --- a/api/proto/banyandb/property/v1/repair.proto +++ b/api/proto/banyandb/property/v1/repair.proto @@ -69,7 +69,18 @@ message PropertySync { int64 delete_time = 3; } -message NoMorePropertySync {} +enum PropertySyncFromType { + PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED = 0; + PROPERTY_SYNC_FROM_TYPE_MISSING = 1; // client missing but server existing + PROPERTY_SYNC_FROM_TYPE_SYNC = 2; // client existing but server missing or SHA value mismatches +} + +message PropertySyncWithFrom { + PropertySyncFromType from = 1; + PropertySync property = 2; +} + +message WaitNextDifferData {} message RepairRequest { oneof data { @@ -82,9 +93,8 @@ message RepairRequest { // case 2: client existing but server missing // case 3: SHA value mismatches PropertySync property_sync = 4; - // if client side is already send all the properties(missing or property sync) - // which means the client side will not sending more properties to sync, server side should close the stream. - NoMorePropertySync no_more_property_sync = 5; + // wait next differ tree summary for process + WaitNextDifferData wait_next_differ = 5; } } @@ -96,7 +106,7 @@ message RepairResponse { // repair stage // case 1: return from PropertyMissing // case 3: return if the client is older - PropertySync property_sync = 3; + PropertySyncWithFrom property_sync = 3; } } diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 9fe4aa7e..cd4921c7 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -19,6 +19,7 @@ package grpc import ( "context" + "fmt" "math" "sync" "time" @@ -280,15 +281,21 @@ func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, sh Id: propertypkg.GetPropertyID(cur), Property: cur, } + var successCount int futures := make([]bus.Future, 0, len(nodes)) for _, node := range nodes { f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req)) if err != nil { - return nil, errors.Wrapf(err, "failed to publish property update to node %s", node) + ps.log.Debug().Err(err).Str("node", node).Msg("failed to publish property update") + continue } + successCount++ futures = append(futures, f) } + if successCount == 0 { + return nil, fmt.Errorf("failed to publish property update to any node") + } // Wait for all futures to complete, and which should last have one success haveSuccess := false var lastestError error diff --git a/banyand/property/gossip/client.go b/banyand/property/gossip/client.go index 2bf87ca5..5342e304 100644 --- a/banyand/property/gossip/client.go +++ b/banyand/property/gossip/client.go @@ -114,6 +114,7 @@ func (s *service) getRegisteredNode(id string) (*databasev1.Node, bool) { s.mu.RLock() defer s.mu.RUnlock() node, exist := s.registered[id] + s.log.Debug().Str("node", id).Bool("exist", exist).Int("register_count", len(s.registered)).Msg("get registered gossip node") return node, exist } @@ -121,6 +122,9 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) { if s.traceStreamSelector != nil { s.traceStreamSelector.(schema.EventHandler).OnAddOrUpdate(md) } + if selEventHandler, ok := s.sel.(schema.EventHandler); ok { + selEventHandler.OnAddOrUpdate(md) + } if md.Kind != schema.KindNode { return } @@ -129,10 +133,6 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) { s.log.Warn().Msg("invalid metadata type") return } - s.sel.AddNode(node) - if s.traceStreamSelector != nil { - s.traceStreamSelector.AddNode(node) - } address := node.PropertyRepairGossipGrpcAddress if address == "" { s.log.Warn().Stringer("node", node).Msg("node does not have gossip address, skipping registration") @@ -143,6 +143,10 @@ func (s *service) OnAddOrUpdate(md schema.Metadata) { s.log.Warn().Stringer("node", node).Msg("node does not have a name, skipping registration") return } + s.sel.AddNode(node) + if s.traceStreamSelector != nil { + s.traceStreamSelector.AddNode(node) + } s.mu.Lock() defer s.mu.Unlock() @@ -155,6 +159,9 @@ func (s *service) OnDelete(md schema.Metadata) { if s.traceStreamSelector != nil { s.traceStreamSelector.(schema.EventHandler).OnDelete(md) } + if selEventHandler, ok := s.sel.(schema.EventHandler); ok { + selEventHandler.OnDelete(md) + } if md.Kind != schema.KindNode { return } diff --git a/banyand/property/gossip/server.go b/banyand/property/gossip/server.go index aa5fc36c..689586d6 100644 --- a/banyand/property/gossip/server.go +++ b/banyand/property/gossip/server.go @@ -51,7 +51,7 @@ var ( }]}`, serviceName) // perNodeSyncTimeout is the timeout for each node to sync the property data. - perNodeSyncTimeout = time.Minute * 10 + perNodeSyncTimeout = time.Hour * 1 ) func (s *service) Subscribe(listener MessageListener) { @@ -188,11 +188,15 @@ func (q *protocolHandler) handle(ctx context.Context, request *handlingRequest) if err != nil { q.s.serverMetrics.totalSendToNextErr.Inc(1, request.Group) handlingSpan.Error(err.Error()) + q.s.log.Warn().Err(err).Stringer("request", request). + Msgf("failed to handle gossip message for propagation") } q.s.serverMetrics.totalFinished.Inc(1, request.Group) q.s.serverMetrics.totalLatency.Inc(n.Sub(time.Unix(0, now)).Seconds(), request.Group) if !needsKeepPropagation { - q.s.serverMetrics.totalPropagationCount.Inc(float64(request.Context.CurrentPropagationCount), + q.s.log.Info().Str("group", request.Group).Uint32("shardNum", request.ShardId). + Msgf("propagation message for propagation is finished") + q.s.serverMetrics.totalPropagationCount.Inc(1, request.Group, request.Context.OriginNode) q.s.serverMetrics.totalPropagationPercent.Observe( float64(request.Context.CurrentPropagationCount)/float64(request.Context.MaxPropagationCount), request.Group) @@ -388,6 +392,7 @@ func (s *service) newConnectionFromNode(n *databasev1.Node) (*grpc.ClientConn, e return nil, fmt.Errorf("failed to get client transport credentials: %w", err) } conn, err := grpc.NewClient(n.PropertyRepairGossipGrpcAddress, append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...) + s.log.Debug().Str("address", n.PropertyRepairGossipGrpcAddress).Msg("starting to create gRPC client connection to node") if err != nil { return nil, fmt.Errorf("failed to create gRPC client connection to node %s: %w", n.PropertyRepairGossipGrpcAddress, err) } diff --git a/banyand/property/gossip/service.go b/banyand/property/gossip/service.go index 0f76fa4a..db44a65d 100644 --- a/banyand/property/gossip/service.go +++ b/banyand/property/gossip/service.go @@ -112,6 +112,7 @@ func NewMessenger(omr observability.MetricsRegistry, metadata metadata.Repo, pip registered: make(map[string]*databasev1.Node), scheduleInterval: time.Hour * 2, sel: node.NewRoundRobinSelector("", metadata), + port: 17932, } } @@ -133,6 +134,7 @@ func (s *service) PreRun(ctx context.Context) error { s.listeners = make([]MessageListener, 0) s.serverMetrics = newServerMetrics(s.omr.With(serverScope)) if s.metadata != nil { + s.sel.OnInit([]schema.Kind{schema.KindGroup}) s.metadata.RegisterHandler("property-repair-nodes", schema.KindNode, s) s.metadata.RegisterHandler("property-repair-groups", schema.KindGroup, s) if err := s.initTracing(ctx); err != nil { diff --git a/banyand/property/gossip/trace.go b/banyand/property/gossip/trace.go index 1f18e132..6f5ed6da 100644 --- a/banyand/property/gossip/trace.go +++ b/banyand/property/gossip/trace.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "sync/atomic" "time" @@ -285,11 +286,12 @@ type recordTrace struct { id string allSpans []*recordTraceSpan roundNum int + lock sync.Mutex } func (r *recordTrace) CreateSpan(parent Span, message string) Span { spanID := fmt.Sprintf("%s_%d", r.s.nodeID, time.Now().UnixNano()) - r.request.TraceContext.ParentSpanId = spanID + r.changeParentID(spanID) span := &recordTraceSpan{ trace: r, id: spanID, @@ -302,6 +304,12 @@ func (r *recordTrace) CreateSpan(parent Span, message string) Span { return span } +func (r *recordTrace) changeParentID(id string) { + r.lock.Lock() + defer r.lock.Unlock() + r.request.TraceContext.ParentSpanId = id +} + func (r *recordTrace) ActivateSpan() Span { return r.currentSpan } @@ -350,7 +358,7 @@ func (r *recordTraceSpan) End() { // if still have parent span, then this is not the root span // change the context to parent span if r.parent != nil { - r.trace.request.TraceContext.ParentSpanId = r.parent.ID() + r.trace.changeParentID(r.parent.ID()) } } diff --git a/banyand/property/repair.go b/banyand/property/repair.go index cf174201..fb0b07a3 100644 --- a/banyand/property/repair.go +++ b/banyand/property/repair.go @@ -62,7 +62,6 @@ import ( const ( repairBatchSearchSize = 100 - repairFileNewLine = '\n' ) type repair struct { @@ -126,6 +125,7 @@ func (r *repair) checkHasUpdates() (bool, error) { } func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group string) (err error) { + r.l.Debug().Msgf("starting building status from snapshot path %s, group: %s", snapshotPath, group) startTime := time.Now() defer func() { r.metrics.totalBuildTreeFinished.Inc(1) @@ -449,20 +449,27 @@ func (r *repairTreeFileReader) seekPosition(offset int64, whence int) error { return nil } -func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, forceReFromStart bool) ([]*repairTreeNode, error) { +func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, forceReFromStart bool) (nodes []*repairTreeNode, err error) { + defer func() { + recoverErr := recover() + if recoverErr != nil { + if recoverData, ok := recoverErr.(error); ok && err == nil { + err = fmt.Errorf("reading repair tree file %s failure: %w", r.file.Name(), recoverData) + } + } + }() if parent == nil { // reading the root node - err := r.seekPosition(r.footer.slotNodeFinishedOffset, io.SeekStart) - if err != nil { + if err = r.seekPosition(r.footer.slotNodeFinishedOffset, io.SeekStart); err != nil { return nil, fmt.Errorf("seeking to root node offset %d in file %s failure: %w", r.footer.slotNodeFinishedOffset, r.file.Name(), err) } rootDataBytes := make([]byte, r.footer.rootNodeLen) if _, err = io.ReadFull(r.reader, rootDataBytes); err != nil { return nil, fmt.Errorf("reading root node data from file %s failure: %w", r.file.Name(), err) } - _, shaValue, err := encoding.DecodeBytes(rootDataBytes) - if err != nil { - return nil, fmt.Errorf("decoding root node sha value from file %s failure: %w", r.file.Name(), err) + _, shaValue, decodeErr := encoding.DecodeBytes(rootDataBytes) + if decodeErr != nil { + return nil, fmt.Errorf("decoding root node sha value from file %s failure: %w", r.file.Name(), decodeErr) } return []*repairTreeNode{ { @@ -472,54 +479,8 @@ func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, fo }, nil } - var err error if parent.tp == repairTreeNodeTypeRoot { - needSeek := false - if r.paging == nil || r.paging.lastNode != parent || forceReFromStart { - needSeek = true - r.paging = newRepairTreeReaderPage(parent, r.footer.slotNodeCount) - } - if needSeek { - // reading the slot nodes - if err = r.seekPosition(r.footer.leafNodeFinishedOffset, io.SeekStart); err != nil { - return nil, fmt.Errorf("seeking to slot node offset %d in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err) - } - } - var slotNodeIndex, leafStartOff, leafCount int64 - var slotShaVal, slotDataBytes []byte - count := r.paging.nextPage(pagingSize) - nodes := make([]*repairTreeNode, 0, count) - for i := int64(0); i < count; i++ { - slotDataBytes, err = r.reader.ReadBytes(repairFileNewLine) - if err != nil { - return nil, fmt.Errorf("reading slot node data from file %s failure: %w", r.file.Name(), err) - } - slotDataBytes, slotNodeIndex, err = encoding.BytesToVarInt64(slotDataBytes) - if err != nil { - return nil, fmt.Errorf("decoding slot node index from file %s failure: %w", r.file.Name(), err) - } - slotDataBytes, slotShaVal, err = encoding.DecodeBytes(slotDataBytes) - if err != nil { - return nil, fmt.Errorf("decoding slot node sha value from file %s failure: %w", r.file.Name(), err) - } - slotDataBytes, leafStartOff, err = encoding.BytesToVarInt64(slotDataBytes) - if err != nil { - return nil, fmt.Errorf("decoding slot node leaf start offset from file %s failure: %w", r.file.Name(), err) - } - _, leafCount, err = encoding.BytesToVarInt64(slotDataBytes) - if err != nil { - return nil, fmt.Errorf("decoding slot node leaf length from file %s failure: %w", r.file.Name(), err) - } - nodes = append(nodes, &repairTreeNode{ - shaValue: string(slotShaVal), - slotInx: int32(slotNodeIndex), - tp: repairTreeNodeTypeSlot, - leafStart: leafStartOff, - leafCount: leafCount, - }) - } - - return nodes, nil + return r.readSlots(parent, pagingSize, forceReFromStart) } else if parent.tp == repairTreeNodeTypeLeaf { return nil, nil } @@ -538,11 +499,16 @@ func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, fo } var entity, shaVal []byte count := r.paging.nextPage(pagingSize) - nodes := make([]*repairTreeNode, 0, count) for i := int64(0); i < count; i++ { - leafDataBytes, err := r.reader.ReadBytes(repairFileNewLine) + var dataSize int64 + err = binary.Read(r.reader, binary.LittleEndian, &dataSize) if err != nil { - return nil, fmt.Errorf("reading leaf node data from file %s failure: %w", r.file.Name(), err) + return nil, fmt.Errorf("reading leaf node data size from file %s failure: %w", r.file.Name(), err) + } + leafDataBytes := make([]byte, dataSize) + leafReadLen, err := io.ReadFull(r.reader, leafDataBytes) + if err != nil { + return nil, fmt.Errorf("reading leaf node data from file %s failure(readed: %d): %w", r.file.Name(), leafReadLen, err) } leafDataBytes, entity, err = encoding.DecodeBytes(leafDataBytes) if err != nil { @@ -562,6 +528,61 @@ func (r *repairTreeFileReader) read(parent *repairTreeNode, pagingSize int64, fo return nodes, nil } +func (r *repairTreeFileReader) readSlots(parent *repairTreeNode, pagingSize int64, forceReFromStart bool) (nodes []*repairTreeNode, err error) { + needSeek := false + if r.paging == nil || r.paging.lastNode != parent || forceReFromStart { + needSeek = true + r.paging = newRepairTreeReaderPage(parent, r.footer.slotNodeCount) + } + if needSeek { + // reading the slot nodes + if err = r.seekPosition(r.footer.leafNodeFinishedOffset, io.SeekStart); err != nil { + return nil, fmt.Errorf("seeking to slot node offset %d in file %s failure: %w", r.footer.leafNodeFinishedOffset, r.file.Name(), err) + } + } + var slotNodeIndex, leafStartOff, leafCount int64 + var slotShaVal, slotDataBytes []byte + count := r.paging.nextPage(pagingSize) + for i := int64(0); i < count; i++ { + var dataSize int64 + err = binary.Read(r.reader, binary.LittleEndian, &dataSize) + if err != nil { + return nil, fmt.Errorf("reading slot node data size from file %s failure: %w", r.file.Name(), err) + } + slotDataBytes = make([]byte, dataSize) + var readSize int + readSize, err = io.ReadFull(r.reader, slotDataBytes) + if err != nil { + return nil, fmt.Errorf("reading slot node data from file %s failure: %w, read %d bytes", r.file.Name(), err, readSize) + } + slotDataBytes, slotNodeIndex, err = encoding.BytesToVarInt64(slotDataBytes) + if err != nil { + return nil, fmt.Errorf("decoding slot node index from file %s failure: %w", r.file.Name(), err) + } + slotDataBytes, slotShaVal, err = encoding.DecodeBytes(slotDataBytes) + if err != nil { + return nil, fmt.Errorf("decoding slot node sha value from file %s failure: %w", r.file.Name(), err) + } + slotDataBytes, leafStartOff, err = encoding.BytesToVarInt64(slotDataBytes) + if err != nil { + return nil, fmt.Errorf("decoding slot node leaf start offset from file %s failure: %w", r.file.Name(), err) + } + _, leafCount, err = encoding.BytesToVarInt64(slotDataBytes) + if err != nil { + return nil, fmt.Errorf("decoding slot node leaf length from file %s failure: %w", r.file.Name(), err) + } + nodes = append(nodes, &repairTreeNode{ + shaValue: string(slotShaVal), + slotInx: int32(slotNodeIndex), + tp: repairTreeNodeTypeSlot, + leafStart: leafStartOff, + leafCount: leafCount, + }) + } + + return nodes, nil +} + func (r *repairTreeFileReader) close() error { return r.file.Close() } @@ -756,7 +777,10 @@ func (r *repairSlotFile) append(entity, shaValue []byte) error { result := make([]byte, 0) result = encoding.EncodeBytes(result, entity) result = encoding.EncodeBytes(result, shaValue) - result = append(result, repairFileNewLine) + err = binary.Write(r.writer, binary.LittleEndian, int64(len(result))) + if err != nil { + return fmt.Errorf("writing entity and sha value length to repair slot file %s failure: %w", r.path, err) + } _, err = r.writer.Write(result) if err != nil { return fmt.Errorf("writing entity and sha value to repair slot file %s failure: %w", r.path, err) @@ -846,7 +870,15 @@ func (r *repairTreeBuilder) build() (err error) { data = encoding.EncodeBytes(data, slot.shaVal) data = encoding.VarInt64ToBytes(data, slot.startOff) data = encoding.VarInt64ToBytes(data, slot.leafCount) - data = append(data, repairFileNewLine) + + slotTotalLen := int64(len(data)) + err = binary.Write(r.writer, binary.LittleEndian, slotTotalLen) + if err != nil { + return fmt.Errorf("writing slot node to repair tree file failure: %w", err) + } + slotNodesLen += 8 + slotNodesFinishedOffset += 8 + writedLen, err = r.writer.Write(data) if err != nil { return fmt.Errorf("writing slot node to repair tree file failure: %w", err) @@ -932,18 +964,20 @@ type repairScheduler struct { latestBuildTreeSchedule time.Time buildTreeClock clock.Clock gossipMessenger gossip.Messenger - closer *run.Closer - buildSnapshotFunc func(context.Context) (string, error) + l *logger.Logger + gossipRepairing *int32 repairTreeScheduler *timestamp.Scheduler quickRepairNotified *int32 db *database - l *logger.Logger + closer *run.Closer metrics *repairSchedulerMetrics - treeSlotCount int + buildSnapshotFunc func(context.Context) (string, error) + scheduleBasicFile string buildTreeScheduleInterval time.Duration quickBuildTreeTime time.Duration - lastBuildTimeLocker sync.Mutex + treeSlotCount int treeLocker sync.RWMutex + lastBuildTimeLocker sync.Mutex } // nolint: contextcheck @@ -970,6 +1004,8 @@ func newRepairScheduler( metrics: newRepairSchedulerMetrics(omr.With(propertyScope.SubScope("scheduler"))), gossipMessenger: gossipMessenger, treeSlotCount: treeSlotCount, + scheduleBasicFile: filepath.Join(db.repairBaseDir, "scheduled.json"), + gossipRepairing: new(int32), } c := timestamp.NewScheduler(l, s.buildTreeClock) s.repairTreeScheduler = c @@ -982,10 +1018,13 @@ func newRepairScheduler( } err = c.Register("trigger", cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor, triggerCronExp, func(time.Time, *logger.Logger) bool { - gossipErr := s.doRepairGossip(s.closer.Ctx()) + l.Debug().Msgf("starting background repair gossip") + group, shardNum, nodes, gossipErr := s.doRepairGossip(s.closer.Ctx()) if gossipErr != nil { s.l.Err(gossipErr).Msg("failed to repair gossip") + return true } + s.l.Info().Str("group", group).Uint32("shardNum", shardNum).Strs("nodes", nodes).Msg("background repair gossip scheduled") return true }) if err != nil { @@ -998,6 +1037,14 @@ func newRepairScheduler( return s, nil } +func (r *repairScheduler) setGossipRepairing(repairing bool) { + val := int32(0) + if repairing { + val = 1 + } + atomic.StoreInt32(r.gossipRepairing, val) +} + func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron bool) { if !r.verifyShouldExecuteBuildTree(t, triggerByCron) { return @@ -1012,6 +1059,12 @@ func (r *repairScheduler) doBuildTreeScheduler(t time.Time, triggerByCron bool) func (r *repairScheduler) verifyShouldExecuteBuildTree(t time.Time, triggerByCron bool) bool { r.lastBuildTimeLocker.Lock() defer r.lastBuildTimeLocker.Unlock() + // ignore the build tree if the gossip repairing is in progress + if atomic.LoadInt32(r.gossipRepairing) == 1 { + r.l.Debug().Msg("gossip repairing is in progress, skipping build tree") + return false + } + if !triggerByCron { // if not triggered by cron, we need to check if the time is after the (last scheduled time + half of the interval) if r.buildTreeClock.Now().After(r.latestBuildTreeSchedule.Add(r.buildTreeScheduleInterval / 2)) { @@ -1035,6 +1088,33 @@ func (r *repairScheduler) initializeInterval() error { return nil } +func (r *repairScheduler) saveHasBuildTree() error { + // save the latest build tree time to the database + now := time.Now() + data := make(map[string]string, 1) + data["lastBuildTreeTime"] = now.Format(time.RFC3339) + json, err := json.Marshal(data) + if err != nil { + return err + } + err = os.MkdirAll(path.Dir(r.scheduleBasicFile), storage.FilePerm) + if err != nil { + return fmt.Errorf("creating directory for repair build tree file %s failure: %w", r.scheduleBasicFile, err) + } + return os.WriteFile(r.scheduleBasicFile, json, storage.FilePerm) +} + +func (r *repairScheduler) checkHasBuildTree() (bool, error) { + _, err := os.Stat(r.scheduleBasicFile) + if err != nil { + if os.IsNotExist(err) { + return false, nil // no build tree file, means no build tree has been done + } + return false, fmt.Errorf("checking repair build tree file existence failure: %w", err) + } + return true, nil +} + //nolint:contextcheck func (r *repairScheduler) doBuildTree() (err error) { now := time.Now() @@ -1043,8 +1123,14 @@ func (r *repairScheduler) doBuildTree() (err error) { r.metrics.totalRepairBuildTreeFinished.Inc(1) r.metrics.totalRepairBuildTreeLatency.Inc(time.Since(now).Seconds()) if err != nil { + r.l.Err(err).Msg("repair build tree failed") r.metrics.totalRepairBuildTreeFailures.Inc(1) } + + saveStatusErr := r.saveHasBuildTree() + if saveStatusErr != nil { + r.l.Err(saveStatusErr).Msgf("saving repair build tree status failure") + } }() sLst := r.db.sLst.Load() if sLst == nil { @@ -1140,17 +1226,17 @@ func (r *repairScheduler) close() { r.closer.CloseThenWait() } -func (r *repairScheduler) doRepairGossip(ctx context.Context) error { +func (r *repairScheduler) doRepairGossip(ctx context.Context) (string, uint32, []string, error) { group, shardNum, err := r.randomSelectGroup(ctx) if err != nil { - return fmt.Errorf("selecting random group failure: %w", err) + return "", 0, nil, fmt.Errorf("selecting random group failure: %w", err) } nodes, err := r.gossipMessenger.LocateNodes(group.Metadata.Name, shardNum, uint32(r.copiesCount(group))) if err != nil { - return fmt.Errorf("locating nodes for group %s, shard %d failure: %w", group.Metadata.Name, shardNum, err) + return "", 0, nil, fmt.Errorf("locating nodes for group %s, shard %d failure: %w", group.Metadata.Name, shardNum, err) } - return r.gossipMessenger.Propagation(nodes, group.Metadata.Name, shardNum) + return group.Metadata.Name, shardNum, nodes, r.gossipMessenger.Propagation(nodes, group.Metadata.Name, shardNum) } func (r *repairScheduler) randomSelectGroup(ctx context.Context) (*commonv1.Group, uint32, error) { diff --git a/banyand/property/repair_gossip.go b/banyand/property/repair_gossip.go index 4468c300..5babb981 100644 --- a/banyand/property/repair_gossip.go +++ b/banyand/property/repair_gossip.go @@ -57,6 +57,13 @@ func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, shar if err != nil { return nil, false, fmt.Errorf("failed to check state file existence for group %s: %w", group, err) } + if !stateExist { + // check has scheduled or not + stateExist, err = b.scheduler.checkHasBuildTree() + if err != nil { + return nil, false, fmt.Errorf("failed to check if the tree state file exists: %w", err) + } + } // if the tree is nil, it means the tree is no data return &emptyRepairTreeReader{}, stateExist, nil } @@ -198,7 +205,9 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN startSyncSpan.Tag(gossip.TraceTagTargetNode, nextNode.Target()) client := propertyv1.NewRepairServiceClient(nextNode) var hasPropertyUpdated bool + r.scheduler.setGossipRepairing(true) defer func() { + r.scheduler.setGossipRepairing(false) if err != nil { startSyncSpan.Tag("has_property_updates", fmt.Sprintf("%t", hasPropertyUpdated)) startSyncSpan.Error(err.Error()) @@ -243,6 +252,7 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN sendTreeSummarySpan.End() // if the root node matched, then ignore the repair if rootMatch { + r.scheduler.l.Debug().Msgf("tree root for group %s, shard %d matched, no need to repair", request.Group, request.ShardId) return nil } @@ -259,6 +269,7 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN recvResp, err := stream.Recv() if err != nil { if errors.Is(err, io.EOF) { + r.scheduler.l.Debug().Msgf("no more messages from server, client side finished syncing properties for group %s, shard %d", request.Group, request.ShardId) return nil } return fmt.Errorf("failed to keep receive tree summary from server: %w", err) @@ -274,27 +285,38 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN differSpan.End() return nil } + r.scheduler.l.Debug().Msgf("received differ tree summary from server, nodes count: %d", len(resp.DifferTreeSummary.Nodes)) r.handleDifferSummaryFromServer(ctx, stream, resp.DifferTreeSummary, reader, syncShard, rootNode, leafReader, ¬ProcessingClientNode, ¤tComparingClientNode) firstTreeSummaryResp = false + + if err = stream.Send(&propertyv1.RepairRequest{ + Data: &propertyv1.RepairRequest_WaitNextDiffer{ + WaitNextDiffer: &propertyv1.WaitNextDifferData{}, + }, + }); err != nil { + differSpan.Error(err.Error()) + r.scheduler.l.Warn().Err(err).Msgf("failed to send wait next differ request to server, group: %s, shard: %d", request.Group, request.ShardId) + } differSpan.End() case *propertyv1.RepairResponse_PropertySync: + r.scheduler.l.Debug().Msgf("received repair response from server") // step 3: keep receiving messages from the server // if the server still sending different nodes, we should keep reading them // if the server sends a PropertySync, we should repair the property and send the newer property back to the server if needed sync := resp.PropertySync syncSpan := tracer.CreateSpan(startSyncSpan, "repair property") syncSpan.Tag(gossip.TraceTagOperateType, gossip.TraceTagOperateRepairProperty) - syncSpan.Tag(gossip.TraceTagPropertyID, string(sync.Id)) - updated, newer, err := syncShard.repair(ctx, sync.Id, sync.Property, sync.DeleteTime) + syncSpan.Tag(gossip.TraceTagPropertyID, string(sync.Property.Id)) + updated, newer, err := syncShard.repair(ctx, sync.Property.Id, sync.Property.Property, sync.Property.DeleteTime) syncSpan.Tag("updated", fmt.Sprintf("%t", updated)) syncSpan.Tag("has_newer", fmt.Sprintf("%t", newer != nil)) if err != nil { syncSpan.Error(err.Error()) - r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Id) + r.scheduler.l.Warn().Err(err).Msgf("failed to repair property %s", sync.Property.Id) r.scheduler.metrics.totalRepairFailedCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) } if updated { - r.scheduler.l.Debug().Msgf("successfully repaired property %s on client side", sync.Id) + r.scheduler.l.Debug().Msgf("successfully repaired property %s on client side", sync.Property.Id) r.scheduler.metrics.totalRepairSuccessCount.Inc(1, request.Group, fmt.Sprintf("%d", request.ShardId)) hasPropertyUpdated = true syncSpan.End() @@ -302,7 +324,10 @@ func (r *repairGossipClient) Rev(ctx context.Context, tracer gossip.Trace, nextN } // if the property hasn't been updated, and the newer property is not nil, // which means the property is newer than the server side, - if !updated && newer != nil { + + // if the sync.From is PROPERTY_MISSING, it means the client doesn't have the property, but the server side has, + // but the current client has the newer property, it's meaning the client has updated, so no need to send the property back to the server + if !updated && newer != nil && sync.From != propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING { var p propertyv1.Property err = protojson.Unmarshal(newer.source, &p) if err != nil { @@ -371,14 +396,7 @@ func (r *repairGossipClient) handleDifferSummaryFromServer( // then queried and sent property to server r.queryPropertyAndSendToServer(ctx, syncShard, (*notProcessingClientNode).entity, stream) } - err := stream.Send(&propertyv1.RepairRequest{ - Data: &propertyv1.RepairRequest_NoMorePropertySync{ - NoMorePropertySync: &propertyv1.NoMorePropertySync{}, - }, - }) - if err != nil { - r.scheduler.l.Warn().Err(err).Msgf("failed to send no more property sync request to server") - } + r.scheduler.l.Debug().Msg("no more property sync request sent to server") return } @@ -568,7 +586,11 @@ func newRepairGossipServer(s *repairScheduler) *repairGossipServer { } } -func (r *repairGossipServer) Repair(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse]) error { +func (r *repairGossipServer) Repair(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse]) (err error) { + r.scheduler.setGossipRepairing(true) + defer func() { + r.scheduler.setGossipRepairing(false) + }() summary, reader, err := r.combineTreeSummary(s) if err != nil { return fmt.Errorf("failed to receive tree summary request: %w", err) @@ -583,6 +605,9 @@ func (r *repairGossipServer) Repair(s grpclib.BidiStreamingServer[propertyv1.Rep shardID := summary.shardID var hasPropertyUpdated bool defer func() { + if err != nil { + r.scheduler.l.Warn().Err(err).Msgf("server failed to repair gossip for group %s, shard %d", group, shardID) + } if hasPropertyUpdated { err = r.scheduler.buildingTree([]common.ShardID{common.ShardID(shardID)}, group, true) if err != nil { @@ -621,42 +646,10 @@ func (r *repairGossipServer) Repair(s grpclib.BidiStreamingServer[propertyv1.Rep serverMissingSlots = append(serverMissingSlots, clientSlot.index) } } - sent, err := r.sendDifferSlots(reader, clientMismatchSlots, serverMissingSlots, s) - if err != nil { - r.scheduler.l.Warn().Err(err).Msgf("failed to send different slots to client") - } - // send the tree and no more different slots needs to be sent - err = r.sendEmptyDiffer(s) - if !sent { - return err - } else if err != nil { - // should keep the message receiving loop - r.scheduler.l.Warn().Msgf("sent no difference slot to client failure, error: %v", err) - } - for { - missingOrSyncRequest, err := s.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return nil - } - return fmt.Errorf("failed to receive missing or sync request: %w", err) - } - syncShard, err := r.scheduler.db.loadShard(s.Context(), common.ShardID(shardID)) - if err != nil { - return fmt.Errorf("shard %d load failure on server side: %w", shardID, err) - } - switch req := missingOrSyncRequest.Data.(type) { - case *propertyv1.RepairRequest_PropertyMissing: - r.processPropertyMissing(s.Context(), syncShard, req.PropertyMissing, s) - case *propertyv1.RepairRequest_PropertySync: - if r.processPropertySync(s.Context(), syncShard, req.PropertySync, s, group) { - hasPropertyUpdated = true - } - case *propertyv1.RepairRequest_NoMorePropertySync: - // if the client has no more property sync, the server side should stop the sync - return nil - } - } + + // send differ slots to the client and wait for the client process + r.sendDifferSlots(reader, clientMismatchSlots, serverMissingSlots, group, shardID, &hasPropertyUpdated, s) + return nil } func (r *repairGossipServer) combineTreeSummary( @@ -770,13 +763,17 @@ func (r *repairGossipServer) processPropertySync( // send the newer property to the client err = s.Send(&propertyv1.RepairResponse{ Data: &propertyv1.RepairResponse_PropertySync{ - PropertySync: &propertyv1.PropertySync{ - Id: newer.id, - Property: &p, - DeleteTime: newer.deleteTime, + PropertySync: &propertyv1.PropertySyncWithFrom{ + From: propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_SYNC, + Property: &propertyv1.PropertySync{ + Id: newer.id, + Property: &p, + DeleteTime: newer.deleteTime, + }, }, }, }) + r.scheduler.l.Debug().Msgf("sending repaired property %s on server side", sync.Id) if err != nil { r.scheduler.l.Warn().Err(err).Msgf("failed to send newer property sync response to client, entity: %s", newer.id) return false @@ -801,13 +798,17 @@ func (r *repairGossipServer) processPropertyMissing( } err = s.Send(&propertyv1.RepairResponse{ Data: &propertyv1.RepairResponse_PropertySync{ - PropertySync: &propertyv1.PropertySync{ - Id: property.id, - Property: data, - DeleteTime: property.deleteTime, + PropertySync: &propertyv1.PropertySyncWithFrom{ + From: propertyv1.PropertySyncFromType_PROPERTY_SYNC_FROM_TYPE_MISSING, + Property: &propertyv1.PropertySync{ + Id: property.id, + Property: data, + DeleteTime: property.deleteTime, + }, }, }, }) + r.scheduler.l.Debug().Msgf("sending missing property on server side: %s", property.id) if err != nil { r.scheduler.l.Warn().Err(err).Msgf("failed to send property sync response to client, entity: %s", missing.Entity) return @@ -818,16 +819,21 @@ func (r *repairGossipServer) sendDifferSlots( reader repairTreeReader, clientMismatchSlots []*repairTreeNode, serverMissingSlots []int32, + group string, + shardID uint32, + hasPropertyUpdated *bool, s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], -) (hasSent bool, err error) { +) { var leafNodes []*repairTreeNode + var err error // send server mismatch slots to the client for _, node := range clientMismatchSlots { for { leafNodes, err = reader.read(node, gossipMerkleTreeReadPageSize, false) if err != nil { - return hasSent, fmt.Errorf("failed to read leaf nodes for slot %d: %w", node.slotInx, err) + r.scheduler.l.Warn().Err(err).Msgf("failed to read leaf nodes for slot %d", node.slotInx) + continue } // if there are no more leaf nodes, we can skip this slot if len(leafNodes) == 0 { @@ -853,8 +859,12 @@ func (r *repairGossipServer) sendDifferSlots( if err != nil { r.scheduler.l.Warn().Err(err). Msgf("failed to send leaf nodes for slot %d", node.slotInx) - } else { - hasSent = true + continue + } + + if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID, hasPropertyUpdated); err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to waiting the client side process differ finished") + return } } } @@ -879,11 +889,56 @@ func (r *repairGossipServer) sendDifferSlots( if err != nil { r.scheduler.l.Warn().Err(err). Msgf("failed to send missing slots") - } else { - hasSent = true + return + } + r.scheduler.l.Debug().Msg("successfully sent differ-tree summary to client") + if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID, hasPropertyUpdated); err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to waiting the client side process differ finished") + return + } + } + + // send the empty differ response to the client to make sure the client side finished processing + if err = r.sendEmptyDiffer(s); err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to send empty differ response to client") + return + } + if err = r.recvMsgAndWaitReadNextDiffer(s, group, shardID, hasPropertyUpdated); err != nil { + r.scheduler.l.Warn().Err(err).Msgf("failed to waiting the client side process empty differ finished") + return + } +} + +func (r *repairGossipServer) recvMsgAndWaitReadNextDiffer( + s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse], + group string, + shardID uint32, + hasPropertyUpdated *bool, +) error { + for { + missingOrSyncRequest, err := s.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + r.scheduler.l.Debug().Msgf("client closed the stream, no more missing or sync request") + return nil + } + return fmt.Errorf("failed to receive missing or sync request: %w", err) + } + syncShard, err := r.scheduler.db.loadShard(s.Context(), common.ShardID(shardID)) + if err != nil { + return fmt.Errorf("shard %d load failure on server side: %w", shardID, err) + } + switch req := missingOrSyncRequest.Data.(type) { + case *propertyv1.RepairRequest_PropertyMissing: + r.processPropertyMissing(s.Context(), syncShard, req.PropertyMissing, s) + case *propertyv1.RepairRequest_PropertySync: + if r.processPropertySync(s.Context(), syncShard, req.PropertySync, s, group) { + *hasPropertyUpdated = true + } + case *propertyv1.RepairRequest_WaitNextDiffer: + return nil } } - return hasSent, nil } func (r *repairGossipServer) sendEmptyDiffer(s grpclib.BidiStreamingServer[propertyv1.RepairRequest, propertyv1.RepairResponse]) error { diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index cb773acb..16675539 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -701,6 +701,220 @@ projection: }) }) +var _ = Describe("Property Cluster Resilience with 5 Data Nodes", func() { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + + var addr string + var deferFunc func() + var rootCmd *cobra.Command + var nodeIDs []string + var nodeRepairAddrs []string + var nodeDirs []string + var closeNodes []func() + var messenger gossip.Messenger + var server embeddedetcd.Server + var ep string + nodeCount := 5 + closedNodeCount := 3 + + BeforeEach(func() { + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + var ports []int + var err error + var spaceDefs []func() + + // Create 5 data nodes + nodeIDs = make([]string, nodeCount) + nodeRepairAddrs = make([]string, nodeCount) + nodeDirs = make([]string, nodeCount) + closeNodes = make([]func(), nodeCount) + spaceDefs = make([]func(), nodeCount) + + // Create data directories for 5 nodes + for i := 0; i < nodeCount; i++ { + nodeDirs[i], spaceDefs[i], err = test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + } + + // Setup cluster with etcd server + By("Starting etcd server") + ports, err = test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + dir, spaceDef, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ep = fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + server, err = embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(dir), + ) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + + // Start 5 data nodes + for i := 0; i < nodeCount; i++ { + By(fmt.Sprintf("Starting data node %d", i)) + nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = setup.DataNodeFromDataDir(ep, nodeDirs[i], + "--property-repair-enabled=true", "--property-repair-quick-build-tree-time=1s") + // Update node ID to use 127.0.0.1 + _, nodePort, found := strings.Cut(nodeIDs[i], ":") + Expect(found).To(BeTrue()) + nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort) + } + + By("Starting liaison node") + _, liaisonHTTPAddr, closerLiaisonNode := setup.LiaisonNodeWithHTTP(ep) + addr = httpSchema + liaisonHTTPAddr + + By("Creating test group with shard=1, copies=5") + defUITemplateWithSchema(rootCmd, addr, 1, nodeCount) + + // Setup gossip messenger + messenger = gossip.NewMessengerWithoutMetadata(observability.NewBypassRegistry(), 9999) + messenger.Validate() + err = messenger.PreRun(context.WithValue(context.Background(), common.ContextNodeKey, common.Node{ + NodeID: "test-client", + })) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < nodeCount; i++ { + registerNodeToMessenger(messenger, nodeIDs[i], nodeRepairAddrs[i]) + } + + deferFunc = func() { + messenger.GracefulStop() + closerLiaisonNode() + for i := 0; i < nodeCount; i++ { + if closeNodes[i] != nil { + closeNodes[i]() + } + } + _ = server.Close() + <-server.StopNotify() + spaceDef() + for i := 0; i < nodeCount; i++ { + spaceDefs[i]() + } + } + }) + + AfterEach(func() { + deferFunc() + }) + + It("should handle node failures and repairs correctly", func() { + By("Writing initial test data") + beforeFirstWrite := time.Now() + applyData(rootCmd, addr, p1YAML, true, propertyTagCount) + + By("Verifying data can be queried initially") + queryData(rootCmd, addr, propertyGroup, property1ID, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo111")) + }) + + By("Verifying repair tree regeneration after first write") + waitForRepairTreeRegeneration(nodeDirs, propertyGroup, beforeFirstWrite) + + By(fmt.Sprintf("Closing %d nodes", closedNodeCount)) + for i := 0; i < closedNodeCount; i++ { + GinkgoWriter.Printf("Closing node %d\n", i) + closeNodes[i]() + closeNodes[i] = nil + } + + By(fmt.Sprintf("Verifying data can still be queried after closing %d nodes", closedNodeCount)) + queryData(rootCmd, addr, propertyGroup, property1ID, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo111")) + }) + + By(fmt.Sprintf("Writing new test data with %d nodes down", closedNodeCount)) + beforeSecondWrite := time.Now() + applyData(rootCmd, addr, p3YAML, true, propertyTagCount) + + By("Verifying new data can be queried") + queryData(rootCmd, addr, propertyGroup, property2ID, 1, func(data string, resp *propertyv1.QueryResponse) { + Expect(data).To(ContainSubstring("foo-mesh")) + }) + + By("Verifying repair tree regeneration on remaining nodes after second write") + waitForRepairTreeRegeneration(nodeDirs[closedNodeCount:nodeCount], propertyGroup, beforeSecondWrite) + + By(fmt.Sprintf("Restarting the %d closed nodes with existing data directories", closedNodeCount)) + for i := 0; i < closedNodeCount; i++ { + GinkgoWriter.Printf("Restarting node %d\n", i) + nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = setup.DataNodeFromDataDir(ep, nodeDirs[i], + "--property-repair-enabled=true", "--property-repair-quick-build-tree-time=1s") + // Update node ID to use 127.0.0.1 + _, nodePort, found := strings.Cut(nodeIDs[i], ":") + Expect(found).To(BeTrue()) + nodeIDs[i] = fmt.Sprintf("127.0.0.1:%s", nodePort) + // Re-register to messenger + registerNodeToMessenger(messenger, nodeIDs[i], nodeRepairAddrs[i]) + } + + By("Triggering repair operations") + err := messenger.Propagation(nodeIDs, propertyGroup, 0) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying repair tree regeneration after repair operations") + waitForRepairTreeRegeneration(nodeDirs, propertyGroup, beforeSecondWrite) + + By("Closing all nodes before checking data consistency") + for i := 0; i < nodeCount; i++ { + if closeNodes[i] != nil { + GinkgoWriter.Printf("Closing node %d for data consistency check\n", i) + closeNodes[i]() + closeNodes[i] = nil + } + } + + By("Verifying data consistency across all nodes after repair") + Eventually(func() bool { + allNodesConsistent := true + for i := 0; i < nodeCount; i++ { + store, err := generateInvertedStore(nodeDirs[i]) + if err != nil { + GinkgoWriter.Printf("Node %d store error: %v\n", i, err) + allNodesConsistent = false + continue + } + query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{propertyGroup}, + }, "_group", "_entity_id") + if err != nil { + GinkgoWriter.Printf("Node %d query build error: %v\n", i, err) + allNodesConsistent = false + continue + } + searchResult, err := store.Search(context.Background(), []index.FieldKey{sourceFieldKey, deletedFieldKey}, query, 10) + if err != nil { + GinkgoWriter.Printf("Node %d search error: %v\n", i, err) + allNodesConsistent = false + continue + } + + // Filter non-deleted properties + nonDeletedCount := 0 + for _, result := range searchResult { + deleted := convert.BytesToBool(result.Fields[deletedFieldKey.TagName]) + if !deleted { + nonDeletedCount++ + } + } + + GinkgoWriter.Printf("Node %d has %d total properties, %d non-deleted\n", i, len(searchResult), nonDeletedCount) + if nonDeletedCount < 2 { + allNodesConsistent = false + } + } + return allNodesConsistent + }, flags.EventuallyTimeout).Should(BeTrue()) + }) +}) + func filterProperties(doc []index.SeriesDocument, filter func(property *propertyv1.Property, deleted bool) bool) (res []*propertyv1.Property) { for _, p := range doc { deleted := convert.BytesToBool(p.Fields[deletedFieldKey.TagName]) @@ -867,3 +1081,33 @@ func registerNodeToMessenger(m gossip.Messenger, nodeID, gossipRepairAddr string }, }) } + +func getRepairTreeFilePath(nodeDir, group string) string { + return path.Join(nodeDir, "property", "repairs", "shard0", fmt.Sprintf("state-tree-%s.data", group)) +} + +func getRepairTreeModTime(nodeDir, group string) (time.Time, error) { + filePath := getRepairTreeFilePath(nodeDir, group) + info, err := os.Stat(filePath) + if err != nil { + return time.Time{}, err + } + return info.ModTime(), nil +} + +func waitForRepairTreeRegeneration(nodeDirs []string, group string, beforeTime time.Time) { + Eventually(func() bool { + allRegenerated := true + for _, nodeDir := range nodeDirs { + modTime, err := getRepairTreeModTime(nodeDir, group) + if err != nil { + allRegenerated = false + continue + } + if !modTime.After(beforeTime) { + allRegenerated = false + } + } + return allRegenerated + }, flags.EventuallyTimeout).Should(BeTrue(), "All nodes should regenerate repair tree after data write") +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 2fb6bbd0..e7d778cc 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -258,9 +258,9 @@ - [banyandb/property/v1/repair.proto](#banyandb_property_v1_repair-proto) - [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) - - [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) - [PropertyMissing](#banyandb-property-v1-PropertyMissing) - [PropertySync](#banyandb-property-v1-PropertySync) + - [PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom) - [RepairRequest](#banyandb-property-v1-RepairRequest) - [RepairResponse](#banyandb-property-v1-RepairResponse) - [RootCompare](#banyandb-property-v1-RootCompare) @@ -268,6 +268,9 @@ - [TreeRoot](#banyandb-property-v1-TreeRoot) - [TreeSlotSHA](#banyandb-property-v1-TreeSlotSHA) - [TreeSlots](#banyandb-property-v1-TreeSlots) + - [WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData) + + - [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType) - [RepairService](#banyandb-property-v1-RepairService) @@ -3897,16 +3900,6 @@ Property stores the user defined data -<a name="banyandb-property-v1-NoMorePropertySync"></a> - -### NoMorePropertySync - - - - - - - <a name="banyandb-property-v1-PropertyMissing"></a> ### PropertyMissing @@ -3939,6 +3932,22 @@ Property stores the user defined data +<a name="banyandb-property-v1-PropertySyncWithFrom"></a> + +### PropertySyncWithFrom + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| from | [PropertySyncFromType](#banyandb-property-v1-PropertySyncFromType) | | | +| property | [PropertySync](#banyandb-property-v1-PropertySync) | | | + + + + + + <a name="banyandb-property-v1-RepairRequest"></a> ### RepairRequest @@ -3951,7 +3960,7 @@ Property stores the user defined data | tree_slots | [TreeSlots](#banyandb-property-v1-TreeSlots) | | | | property_missing | [PropertyMissing](#banyandb-property-v1-PropertyMissing) | | repair stage case 1: client missing but server existing | | property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | | case 2: client existing but server missing case 3: SHA value mismatches | -| no_more_property_sync | [NoMorePropertySync](#banyandb-property-v1-NoMorePropertySync) | | if client side is already send all the properties(missing or property sync) which means the client side will not sending more properties to sync, server side should close the stream. | +| wait_next_differ | [WaitNextDifferData](#banyandb-property-v1-WaitNextDifferData) | | wait next differ tree summary for process | @@ -3968,7 +3977,7 @@ Property stores the user defined data | ----- | ---- | ----- | ----------- | | root_compare | [RootCompare](#banyandb-property-v1-RootCompare) | | compare stage | | differ_tree_summary | [DifferTreeSummary](#banyandb-property-v1-DifferTreeSummary) | | | -| property_sync | [PropertySync](#banyandb-property-v1-PropertySync) | | repair stage case 1: return from PropertyMissing case 3: return if the client is older | +| property_sync | [PropertySyncWithFrom](#banyandb-property-v1-PropertySyncWithFrom) | | repair stage case 1: return from PropertyMissing case 3: return if the client is older | @@ -4056,8 +4065,31 @@ Property stores the user defined data + +<a name="banyandb-property-v1-WaitNextDifferData"></a> + +### WaitNextDifferData + + + + + + + +<a name="banyandb-property-v1-PropertySyncFromType"></a> + +### PropertySyncFromType + + +| Name | Number | Description | +| ---- | ------ | ----------- | +| PROPERTY_SYNC_FROM_TYPE_UNSPECIFIED | 0 | | +| PROPERTY_SYNC_FROM_TYPE_MISSING | 1 | client missing but server existing | +| PROPERTY_SYNC_FROM_TYPE_SYNC | 2 | client existing but server missing or SHA value mismatches | + + diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml index 0488a9a7..d583fc0f 100644 --- a/test/docker/base-compose.yml +++ b/test/docker/base-compose.yml @@ -47,6 +47,7 @@ services: - 17912 - 2121 - 6060 + - 17932 command: data --etcd-endpoints=http://etcd:2379 healthcheck: test: ["CMD", "./bydbctl", "health", "--addr=http://127.0.0.1:17913"] diff --git a/test/property_repair/README.md b/test/property_repair/README.md new file mode 100644 index 00000000..56415d43 --- /dev/null +++ b/test/property_repair/README.md @@ -0,0 +1,142 @@ +# Property Repair Benchmark Test + +## Purpose + +This performance evaluation is designed to test the execution efficiency of the backend’s automated Property repair feature. +It primarily covers the following key features: +1. The cluster is configured with a maximum of three nodes, one group, one shard, and two replicas. +2. Each shard contains **100,000** records, with each record approximately **2KB** in size. + +## Requirements + +- Docker and Docker Compose +- Go 1.21+ + +### Building the BanyanDB Docker Image + +Please make sure you have the latest version of the BanyanDB codebase, and building the Docker image is essential before running the tests. + +```bash +export TARGET_OS=linux +export PLATFORMS=linux/arm64 # please replace to your platform +make clean && make generate && make release && make docker.build +``` + +## Monitoring + +The performance evaluation is primarily conducted by observing logs and monitoring metrics in Prometheus. + +The logs provide clear markers for the start and end times of the backend repair process. +In Prometheus, by visiting `http://localhost:9090`, you can view system performance metrics for each machine in the cluster. + +### CPU Usage Monitoring + +Use this PromQL query to monitor CPU usage during property repair: + +```promql +avg by (instance) ( + rate(process_cpu_seconds_total[1m]) * 100 +) +``` + +## Case 1: Fully Data Property Repair + +In the first test case, a brand-new, empty node will be started, +and **100,000** records will be synchronized to it in a single batch. +This test is designed to measure the node's CPU usage and the total time consumed during the process. + +### Running the Integrated Test + +The full data test case runs as an integrated test that handles all steps automatically: + +```bash +cd test/property_repair/full_data +# Run the complete integrated test +go test . -v -timeout 3h -count=1 +``` + +The test performs the following steps: +1. Starts a 3-node cluster using docker-compose +2. Creates a group with 1 replica and loads 100,000 properties +3. Updates the group to 2 replicas to trigger property repair +4. Monitors the repair process through Prometheus metrics +5. Verifies both propagation count and repair success count increase + +Then, wait for the propagation to complete in the cluster. + +### Result + +After waiting for the Property Repair process to complete, the following information was recorded: +1. **Duration**: The total estimated time taken was approximately **36 minutes**. +2. **CPU Consumption**: The estimated CPU usage on the new node was about **1.4 CPU cores**. + +The detailed CPU usage rate is shown in the figure below. + + + +## Case 2: Half-Data Property Repair + +In the second test case, three nodes are started, with the group’s number of copies initially set to two. +First, **50,000** records are written to all three nodes. +Next, the group’s copies' setting is changed to one, and the remaining **50,000** records are written to only two fixed nodes. +At this point, the third node’s dataset is missing half of the data compared to the other nodes. +Finally, the group’s copies' setting is changed back to two, allowing the backend Property Repair process to perform the data synchronization automatically. + +### Running the Integrated Test + +This test case runs as an integrated test that handles all steps automatically: + +```bash +cd test/property_repair/half_data +# Run the complete integrated test +go test . -v -timeout 3h -count=1 +``` + +The test performs the following steps: +1. Starts a 3-node cluster using docker-compose +2. Creates a group with 2 replicas and loads 50,000 properties +3. Reduces the group to 1 replica +4. Writes additional 50,000 properties (creating data inconsistency) +5. Increases the group back to 2 replicas to trigger property repair +6. Monitors the repair process through Prometheus metrics +7. Verifies both propagation count and repair success count increase + +Then, wait for the propagation to complete in the cluster. + +### Result + +After waiting for the Property Repair process to complete, the following information was recorded: +1. **Duration**: The total estimated time taken was approximately **30 minutes**. +2. **CPU Consumption**: The estimated CPU usage on the new node was about **1.1 CPU cores**. + +The detailed CPU usage rate is shown in the figure below. + + + +## Case 3: All Nodes Data are the Same + +In the third test case, which represents the most common scenario, all nodes contain identical data. + +### Running the Integrated Test + +This test case runs as an integrated test that handles all steps automatically: + +```bash +cd test/property_repair/same_data +# Run the complete integrated test +go test . -v -timeout 3h -count=1 +``` + +The test performs the following steps: +1. Starts a 3-node cluster using docker-compose +2. Creates a group with 2 replicas and loads 100,000 properties across all nodes +3. Monitors the repair process through Prometheus metrics +4. Verifies propagation count increases (repair count verification is skipped since data is consistent) + +Then, wait for the propagation to complete in the cluster. + +### Result + +After waiting for the Property Repair process to complete, the following information was recorded: +1. **Duration**: Almost Less than **1 minute**. +2. **CPU Consumption**: The estimated CPU usage in almost has no impact. \ No newline at end of file diff --git a/test/property_repair/base-compose.yml b/test/property_repair/base-compose.yml new file mode 100644 index 00000000..112a48c5 --- /dev/null +++ b/test/property_repair/base-compose.yml @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.8' + + +x-data-base: &data_base + extends: + file: ../docker/base-compose.yml + service: data + image: apache/skywalking-banyandb:latest + depends_on: + etcd: + condition: service_healthy + cpus: 2 + mem_limit: 4g + command: &base_data_cmd | + data --etcd-endpoints=http://etcd:2379 + --observability-modes=prometheus + --observability-listener-addr=:2121 + --property-repair-enabled=true + --property-repair-build-tree-cron="@every 30s" + --stream-root-path=/tmp/banyandb-data + --measure-root-path=/tmp/banyandb-data + --property-root-path=/tmp/banyandb-data + +# Base services for property repair tests +services: + etcd: + extends: + file: ../docker/base-compose.yml + service: etcd + ports: + - "2379:2379" + + liaison: + extends: + file: ../docker/base-compose.yml + service: liaison + image: apache/skywalking-banyandb:latest + depends_on: + etcd: + condition: service_healthy + cpus: 2 + mem_limit: 3g + container_name: liaison + ports: + - "17912:17912" + - "17913:17913" + - "2121:2121" # observability port for metrics + command: liaison --etcd-endpoints=http://etcd:2379 --observability-modes=prometheus + + # Data nodes with specific configurations + data-node-1: + <<: *data_base + hostname: data-node-1 + container_name: data-node-1 + ports: + - "2122:2121" # observability port + - "6061:6060" # pprof port + command: | + data --etcd-endpoints=http://etcd:2379 + --observability-modes=prometheus + --observability-listener-addr=:2121 + --property-repair-enabled=true + --property-repair-build-tree-cron="@every 30s" + --stream-root-path=/tmp/banyandb-data + --measure-root-path=/tmp/banyandb-data + --property-root-path=/tmp/banyandb-data + --property-repair-trigger-cron="*/10 * * * *" + + data-node-2: + <<: *data_base + hostname: data-node-2 + container_name: data-node-2 + ports: + - "2123:2121" # observability port + - "6062:6060" # pprof port + command: *base_data_cmd + + data-node-3: + <<: *data_base + hostname: data-node-3 + container_name: data-node-3 + ports: + - "2124:2121" # observability port + - "6063:6060" # pprof port + command: *base_data_cmd + + # Prometheus for monitoring + prometheus: + image: prom/prometheus:latest + ports: + - "9090:9090" + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' \ No newline at end of file diff --git a/test/property_repair/full_data/cpu-usage.png b/test/property_repair/full_data/cpu-usage.png new file mode 100644 index 00000000..54410cad Binary files /dev/null and b/test/property_repair/full_data/cpu-usage.png differ diff --git a/test/property_repair/full_data/docker-compose-3nodes.yml b/test/property_repair/full_data/docker-compose-3nodes.yml new file mode 100644 index 00000000..20663b00 --- /dev/null +++ b/test/property_repair/full_data/docker-compose-3nodes.yml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.8' + +services: + etcd: + extends: + file: ../base-compose.yml + service: etcd + networks: + - property-repair + + liaison: + extends: + file: ../base-compose.yml + service: liaison + networks: + - property-repair + + data-node-1: + extends: + file: ../base-compose.yml + service: data-node-1 + networks: + - property-repair + + data-node-2: + extends: + file: ../base-compose.yml + service: data-node-2 + networks: + - property-repair + + data-node-3: + extends: + file: ../base-compose.yml + service: data-node-3 + networks: + - property-repair + + prometheus: + extends: + file: ../base-compose.yml + service: prometheus + networks: + - property-repair + volumes: + - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml + +networks: + property-repair: + driver: bridge \ No newline at end of file diff --git a/test/property_repair/full_data/integrated_test.go b/test/property_repair/full_data/integrated_test.go new file mode 100644 index 00000000..1e4e3baa --- /dev/null +++ b/test/property_repair/full_data/integrated_test.go @@ -0,0 +1,192 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fulldata + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + propertyrepair "github.com/apache/skywalking-banyandb/test/property_repair" +) + +var ( + composeFile string + conn *grpc.ClientConn + groupClient databasev1.GroupRegistryServiceClient + propertyClient databasev1.PropertyRegistryServiceClient + propertyServiceClient propertyv1.PropertyServiceClient +) + +func TestPropertyRepairIntegrated(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Property Repair Integrated Test Suite", ginkgo.Label("integration", "slow", "property_repair", "full_data")) +} + +var _ = ginkgo.BeforeSuite(func() { + fmt.Println("Starting Property Repair Integration Test Suite...") + + // Disable Ryuk reaper to avoid container creation issues + os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + + // Set Docker host if needed (for local development) + if os.Getenv("DOCKER_HOST") == "" { + os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock") + } +}) + +var _ = ginkgo.AfterSuite(func() { + if conn != nil { + _ = conn.Close() + } + if composeFile != "" { + fmt.Println("Stopping compose stack...") + propertyrepair.ExecuteComposeCommand("-f", composeFile, "down") + } +}) + +var _ = ginkgo.Describe("Property Repair Full Data Test", ginkgo.Ordered, func() { + ginkgo.Describe("Step 1: Initial Data Load with 2 Nodes", func() { + ginkgo.It("Should start 3 data node cluster", func() { + // Initialize compose stack with 2-node configuration + var err error + composeFile, err = filepath.Abs("docker-compose-3nodes.yml") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fmt.Printf("Using compose file: %s\n", composeFile) + + // Start the docker compose stack without waiting first + fmt.Println("Starting services...") + err = propertyrepair.ExecuteComposeCommand("-f", composeFile, "up", "-d") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Simple wait for services to be ready + time.Sleep(10 * time.Second) + }) + + ginkgo.It("Should connect to liaison and setup clients", func() { + var err error + fmt.Println("Connecting to Liaison server...") + + conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + fmt.Println("Connected to Liaison server successfully") + + groupClient = databasev1.NewGroupRegistryServiceClient(conn) + propertyClient = databasev1.NewPropertyRegistryServiceClient(conn) + propertyServiceClient = propertyv1.NewPropertyServiceClient(conn) + }) + + ginkgo.It("Should create group with 1 replica and write 100k properties", func() { + ctx := context.Background() + + fmt.Println("=== Step 1: Creating group with 1 replica and loading initial data ===") + + // Create group with 1 replica + propertyrepair.CreateGroup(ctx, groupClient, 1) + + // Create property schema + propertyrepair.CreatePropertySchema(ctx, propertyClient) + + // Write 100,000 properties + fmt.Println("Starting to write 100,000 properties...") + startTime := time.Now() + + err := propertyrepair.WriteProperties(ctx, propertyServiceClient, 0, 100000) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + duration := time.Since(startTime) + fmt.Printf("=== Step 1 completed: wrote 100,000 properties in %v ===\n", duration) + }) + }) + + ginkgo.Describe("Step 2: Add 3rd Node and Update Replicas", func() { + ginkgo.It("Should update group replicas to 2", func() { + ctx := context.Background() + + fmt.Println("Updating group replicas from 1 to 2...") + startTime := time.Now() + + // Update group replicas to 2 + propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2) + + duration := time.Since(startTime) + fmt.Printf("=== Step 2 completed: updated replicas to 2 in %v ===\n", duration) + }) + }) + + ginkgo.Describe("Verification", func() { + ginkgo.It("Should verify the property repair completed and prometheus metrics", func() { + fmt.Println("=== Verification: Property repair process and prometheus metrics ===") + + // Get initial metrics from all data nodes + fmt.Println("Reading initial prometheus metrics from all data nodes...") + beforeMetrics := propertyrepair.GetAllNodeMetrics() + + // Print initial metrics state + fmt.Println("Initial metrics state:") + for _, metrics := range beforeMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy before verification: %s", + metrics.NodeName, metrics.ErrorMessage)) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + } + + fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") + fmt.Println("Waiting for property repair to trigger (@every 10 minutes)...") + + gomega.Eventually(func() bool { + time.Sleep(time.Second * 30) + // Get metrics after repair + fmt.Println("Trying to reading prometheus metrics to check repair status...") + afterMetrics := propertyrepair.GetAllNodeMetrics() + propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics) + + // Check all node health, no crash + for _, metrics := range afterMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy after repair: %s", + metrics.NodeName, metrics.ErrorMessage)) + } + + // check the property repair progress finished, and the property must be repaired (at last one success) + isAnyRepairFinished := false + for i, before := range beforeMetrics { + after := afterMetrics[i] + if before.TotalPropagationCount < after.TotalPropagationCount && + before.RepairSuccessCount < after.RepairSuccessCount { + isAnyRepairFinished = true + } + } + return isAnyRepairFinished + }, time.Hour*2).Should(gomega.BeTrue(), "Property repair cycle should complete within the expected time") + }) + }) +}) diff --git a/test/property_repair/half_data/cpu-usage.png b/test/property_repair/half_data/cpu-usage.png new file mode 100644 index 00000000..fd367531 Binary files /dev/null and b/test/property_repair/half_data/cpu-usage.png differ diff --git a/test/property_repair/half_data/docker-compose-3nodes.yml b/test/property_repair/half_data/docker-compose-3nodes.yml new file mode 100644 index 00000000..ce2a2070 --- /dev/null +++ b/test/property_repair/half_data/docker-compose-3nodes.yml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.8' + +volumes: + node1-data: {} + node2-data: {} + node3-data: {} + +services: + etcd: + extends: + file: ../base-compose.yml + service: etcd + networks: + - half-data + + liaison: + extends: + file: ../base-compose.yml + service: liaison + networks: + - half-data + + data-node-1: + extends: + file: ../base-compose.yml + service: data-node-1 + volumes: + - node1-data:/tmp/banyandb-data + networks: + - half-data + + data-node-2: + extends: + file: ../base-compose.yml + service: data-node-2 + volumes: + - node2-data:/tmp/banyandb-data + networks: + - half-data + + data-node-3: + extends: + file: ../base-compose.yml + service: data-node-3 + volumes: + - node3-data:/tmp/banyandb-data + networks: + - half-data + + prometheus: + extends: + file: ../base-compose.yml + service: prometheus + networks: + - half-data + volumes: + - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml + +networks: + half-data: + driver: bridge \ No newline at end of file diff --git a/test/property_repair/half_data/integrated_test.go b/test/property_repair/half_data/integrated_test.go new file mode 100644 index 00000000..119d0b49 --- /dev/null +++ b/test/property_repair/half_data/integrated_test.go @@ -0,0 +1,219 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package halfdata + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + propertyrepair "github.com/apache/skywalking-banyandb/test/property_repair" +) + +var ( + composeFile string + conn *grpc.ClientConn + groupClient databasev1.GroupRegistryServiceClient + propertyClient databasev1.PropertyRegistryServiceClient + propertyServiceClient propertyv1.PropertyServiceClient +) + +func TestPropertyRepairHalfData(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Property Repair Half Data Test Suite", ginkgo.Label("integration", "slow", "property_repair", "half_data")) +} + +var _ = ginkgo.BeforeSuite(func() { + fmt.Println("Starting Property Repair Half Data Integration Test Suite...") + + // Disable Ryuk reaper to avoid container creation issues + os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + + // Set Docker host if needed (for local development) + if os.Getenv("DOCKER_HOST") == "" { + os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock") + } +}) + +var _ = ginkgo.AfterSuite(func() { + if conn != nil { + _ = conn.Close() + } + if composeFile != "" { + fmt.Println("Stopping compose stack...") + _ = propertyrepair.ExecuteComposeCommand(composeFile, "down") + } +}) + +var _ = ginkgo.Describe("Property Repair Half Data Test", ginkgo.Ordered, func() { + ginkgo.Describe("Step 1: Initial Setup and Data Load", func() { + ginkgo.It("Should start 3 data node cluster", func() { + // Initialize compose stack with 3-node configuration + var err error + composeFile, err = filepath.Abs("docker-compose-3nodes.yml") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fmt.Printf("Using compose file: %s\n", composeFile) + + // Start the docker compose stack without waiting first + fmt.Println("Starting services...") + err = propertyrepair.ExecuteComposeCommand("-f", composeFile, "up", "-d") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Simple wait for services to be ready + time.Sleep(10 * time.Second) + }) + + ginkgo.It("Should connect to liaison and setup clients", func() { + var err error + fmt.Println("Connecting to Liaison server...") + + conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + fmt.Println("Connected to Liaison server successfully") + + groupClient = databasev1.NewGroupRegistryServiceClient(conn) + propertyClient = databasev1.NewPropertyRegistryServiceClient(conn) + propertyServiceClient = propertyv1.NewPropertyServiceClient(conn) + }) + + ginkgo.It("Should create group with 2 replicas and write 50k properties", func() { + ctx := context.Background() + + fmt.Println("=== Step 1: Creating group with 2 replicas and loading initial data ===") + + // Create group with 2 replicas + propertyrepair.CreateGroup(ctx, groupClient, 2) + + // Create property schema + propertyrepair.CreatePropertySchema(ctx, propertyClient) + + // Write 50,000 properties + fmt.Println("Starting to write 50,000 properties...") + startTime := time.Now() + + err := propertyrepair.WriteProperties(ctx, propertyServiceClient, 0, 50000) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + duration := time.Since(startTime) + fmt.Printf("=== Step 1 completed: wrote 50,000 properties in %v ===\n", duration) + }) + }) + + ginkgo.Describe("Step 2: Reduce Replicas and Add More Data", func() { + ginkgo.It("Should reduce group replicas to 1", func() { + ctx := context.Background() + + fmt.Println("Reducing group replicas from 2 to 1...") + startTime := time.Now() + + // Update group replicas to 1 + propertyrepair.UpdateGroupReplicas(ctx, groupClient, 1) + + duration := time.Since(startTime) + fmt.Printf("=== Step 2 completed: updated replicas to 1 in %v ===\n", duration) + }) + + ginkgo.It("Should write additional 50k properties", func() { + ctx := context.Background() + + fmt.Println("=== Step 3: Writing additional properties ===") + startTime := time.Now() + + // Write another 50,000 properties (50000-100000) + err := propertyrepair.WriteProperties(ctx, propertyServiceClient, 50000, 100000) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + duration := time.Since(startTime) + fmt.Printf("=== Step 3 completed: wrote additional 50,000 properties in %v ===\n", duration) + }) + + ginkgo.It("Should increase group replicas back to 2", func() { + ctx := context.Background() + + fmt.Println("Increasing group replicas from 1 to 2...") + startTime := time.Now() + + // Update group replicas to 2 + propertyrepair.UpdateGroupReplicas(ctx, groupClient, 2) + + duration := time.Since(startTime) + fmt.Printf("=== Step 4 completed: updated replicas to 2 in %v ===\n", duration) + }) + }) + + ginkgo.Describe("Verification", func() { + ginkgo.It("Should verify the property repair completed and prometheus metrics", func() { + fmt.Println("=== Verification: Property repair process and prometheus metrics ===") + + // Get initial metrics from all data nodes + fmt.Println("Reading initial prometheus metrics from all data nodes...") + beforeMetrics := propertyrepair.GetAllNodeMetrics() + + // Print initial metrics state + fmt.Println("Initial metrics state:") + for _, metrics := range beforeMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy before verification: %s", + metrics.NodeName, metrics.ErrorMessage)) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + } + + fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") + fmt.Println("Waiting for property repair to trigger (@every 10 minutes)...") + + gomega.Eventually(func() bool { + time.Sleep(time.Second * 30) + // Get metrics after repair + fmt.Println("Trying to reading prometheus metrics to check repair status...") + afterMetrics := propertyrepair.GetAllNodeMetrics() + propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics) + + // Check all node health, no crash + for _, metrics := range afterMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy after repair: %s", + metrics.NodeName, metrics.ErrorMessage)) + } + + // check the property repair progress finished, and the property must be repaired (at last one success) + isAnyRepairFinished := false + for i, before := range beforeMetrics { + after := afterMetrics[i] + if before.TotalPropagationCount < after.TotalPropagationCount && + before.RepairSuccessCount < after.RepairSuccessCount { + isAnyRepairFinished = true + } + } + return isAnyRepairFinished + }, time.Hour*2).Should(gomega.BeTrue(), "Property repair cycle should complete within the expected time") + }) + }) +}) diff --git a/test/property_repair/prometheus-3nodes.yml b/test/property_repair/prometheus-3nodes.yml new file mode 100644 index 00000000..23968dfc --- /dev/null +++ b/test/property_repair/prometheus-3nodes.yml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'liaison' + static_configs: + - targets: ['liaison:2121'] + scrape_interval: 5s + metrics_path: '/metrics' + + - job_name: 'data-node-1' + static_configs: + - targets: ['data-node-1:2121'] + scrape_interval: 5s + metrics_path: '/metrics' + + - job_name: 'data-node-2' + static_configs: + - targets: ['data-node-2:2121'] + scrape_interval: 5s + metrics_path: '/metrics' + + - job_name: 'data-node-3' + static_configs: + - targets: ['data-node-3:2121'] + scrape_interval: 5s + metrics_path: '/metrics' \ No newline at end of file diff --git a/test/property_repair/same_data/docker-compose-3nodes.yml b/test/property_repair/same_data/docker-compose-3nodes.yml new file mode 100644 index 00000000..e0ef9802 --- /dev/null +++ b/test/property_repair/same_data/docker-compose-3nodes.yml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: '3.8' + +services: + etcd: + extends: + file: ../base-compose.yml + service: etcd + networks: + - no-data + + liaison: + extends: + file: ../base-compose.yml + service: liaison + networks: + - no-data + + data-node-1: + extends: + file: ../base-compose.yml + service: data-node-1 + networks: + - no-data + + data-node-2: + extends: + file: ../base-compose.yml + service: data-node-2 + networks: + - no-data + + data-node-3: + extends: + file: ../base-compose.yml + service: data-node-3 + networks: + - no-data + + prometheus: + extends: + file: ../base-compose.yml + service: prometheus + networks: + - no-data + volumes: + - ../prometheus-3nodes.yml:/etc/prometheus/prometheus.yml + +networks: + no-data: + driver: bridge \ No newline at end of file diff --git a/test/property_repair/same_data/integrated_test.go b/test/property_repair/same_data/integrated_test.go new file mode 100644 index 00000000..e4674c8c --- /dev/null +++ b/test/property_repair/same_data/integrated_test.go @@ -0,0 +1,178 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package samedata + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + propertyrepair "github.com/apache/skywalking-banyandb/test/property_repair" +) + +var ( + composeFile string + conn *grpc.ClientConn + groupClient databasev1.GroupRegistryServiceClient + propertyClient databasev1.PropertyRegistryServiceClient + propertyServiceClient propertyv1.PropertyServiceClient +) + +func TestPropertyRepairSameData(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Property Repair Same Data Test Suite", ginkgo.Label("integration", "slow", "property_repair", "same_data")) +} + +var _ = ginkgo.BeforeSuite(func() { + fmt.Println("Starting Property Repair Same Data Integration Test Suite...") + + // Disable Ryuk reaper to avoid container creation issues + os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + + // Set Docker host if needed (for local development) + if os.Getenv("DOCKER_HOST") == "" { + os.Setenv("DOCKER_HOST", "unix:///var/run/docker.sock") + } +}) + +var _ = ginkgo.AfterSuite(func() { + if conn != nil { + _ = conn.Close() + } + if composeFile != "" { + fmt.Println("Stopping compose stack...") + propertyrepair.ExecuteComposeCommand(composeFile, "down") + } +}) + +var _ = ginkgo.Describe("Property Repair Same Data Test", ginkgo.Ordered, func() { + ginkgo.Describe("Step 1: Initial Data Load", func() { + ginkgo.It("Should start 3 data node cluster", func() { + // Initialize compose stack with 3-node configuration + var err error + composeFile, err = filepath.Abs("docker-compose-3nodes.yml") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fmt.Printf("Using compose file: %s\n", composeFile) + + // Start the docker compose stack without waiting first + fmt.Println("Starting services...") + err = propertyrepair.ExecuteComposeCommand("-f", composeFile, "up", "-d") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Simple wait for services to be ready + time.Sleep(10 * time.Second) + }) + + ginkgo.It("Should connect to liaison and setup clients", func() { + var err error + fmt.Println("Connecting to Liaison server...") + + conn, err = grpchelper.Conn(propertyrepair.LiaisonAddr, 30*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + fmt.Println("Connected to Liaison server successfully") + + groupClient = databasev1.NewGroupRegistryServiceClient(conn) + propertyClient = databasev1.NewPropertyRegistryServiceClient(conn) + propertyServiceClient = propertyv1.NewPropertyServiceClient(conn) + }) + + ginkgo.It("Should create group with 2 replicas and write 100k properties", func() { + ctx := context.Background() + + fmt.Println("=== Step 1: Creating group with 2 replicas and loading data ===") + + // Create group with 2 replicas + propertyrepair.CreateGroup(ctx, groupClient, 2) + + // Create property schema + propertyrepair.CreatePropertySchema(ctx, propertyClient) + + // Write 100,000 properties (same amount across all replicas) + fmt.Println("Starting to write 100,000 properties...") + startTime := time.Now() + + err := propertyrepair.WriteProperties(ctx, propertyServiceClient, 0, 100000) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + duration := time.Since(startTime) + fmt.Printf("=== Step 1 completed: wrote 100,000 properties in %v ===\n", duration) + }) + }) + + ginkgo.Describe("Verification", func() { + ginkgo.It("Should verify the property repair completed and prometheus metrics", func() { + fmt.Println("=== Verification: Property repair process and prometheus metrics ===") + + // Get initial metrics from all data nodes + fmt.Println("Reading initial prometheus metrics from all data nodes...") + beforeMetrics := propertyrepair.GetAllNodeMetrics() + + // Print initial metrics state + fmt.Println("Initial metrics state:") + for _, metrics := range beforeMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy before verification: %s", + metrics.NodeName, metrics.ErrorMessage)) + fmt.Printf("- %s: total_propagation_count=%d, repair_success_count=%d\n", + metrics.NodeName, metrics.TotalPropagationCount, metrics.RepairSuccessCount) + } + + fmt.Println("\n=== Triggering property repair by waiting for scheduled repair cycle ===") + fmt.Println("Waiting for property repair to trigger (@every 10 minutes)...") + + gomega.Eventually(func() bool { + time.Sleep(time.Second * 30) + // Get metrics after repair + fmt.Println("Trying to reading prometheus metrics to check repair status...") + afterMetrics := propertyrepair.GetAllNodeMetrics() + propertyrepair.PrintMetricsComparison(beforeMetrics, afterMetrics) + + // Check all node health, no crash + for _, metrics := range afterMetrics { + gomega.Expect(metrics.IsHealthy).To(gomega.BeTrue(), + fmt.Sprintf("Node %s should be healthy after repair: %s", + metrics.NodeName, metrics.ErrorMessage)) + } + + // For same data scenario, only verify propagation count increased (not repair count) + // Since data is consistent, repairs may not be needed but propagation should still occur + isPropagationActive := false + for i, before := range beforeMetrics { + after := afterMetrics[i] + if before.TotalPropagationCount < after.TotalPropagationCount { + isPropagationActive = true + break + } + } + return isPropagationActive + }, time.Hour*2).Should(gomega.BeTrue(), "Property propagation should be active even with consistent data") + }) + }) +}) diff --git a/test/property_repair/shared_utils.go b/test/property_repair/shared_utils.go new file mode 100644 index 00000000..12c9cebd --- /dev/null +++ b/test/property_repair/shared_utils.go @@ -0,0 +1,428 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package propertyrepair package provides utilities for property repair performance testing in BanyanDB. +package propertyrepair + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" +) + +// Constants for property repair performance testing. +const ( + DataSize = 2048 // 2KB per property + LiaisonAddr = "localhost:17912" + Concurrency = 6 + GroupName = "perf-test-group" + PropertyName = "perf-test-property" +) + +// PrometheusEndpoints defines the prometheus endpoints for data nodes. +var PrometheusEndpoints = []string{ + "http://localhost:2122/metrics", // data-node-1 + "http://localhost:2123/metrics", // data-node-2 + "http://localhost:2124/metrics", // data-node-3 +} + +// NodeMetrics represents the metrics for a data node. +type NodeMetrics struct { + LastScrapeTime time.Time + NodeName string + ErrorMessage string + TotalPropagationCount int64 + RepairSuccessCount int64 + IsHealthy bool +} + +// GenerateLargeData creates a string of specified size filled with random characters. +func GenerateLargeData(size int) string { + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + // Generate some random bytes + randomBytes := make([]byte, 32) + _, err := rand.Read(randomBytes) + if err != nil { + // Fallback to timestamp-based data + baseData := fmt.Sprintf("timestamp-%d-", time.Now().UnixNano()) + repeats := size / len(baseData) + if repeats == 0 { + repeats = 1 + } + return strings.Repeat(baseData, repeats)[:size] + } + + // Create base string from random bytes + var baseBuilder strings.Builder + for _, b := range randomBytes { + baseBuilder.WriteByte(charset[b%byte(len(charset))]) + } + baseData := baseBuilder.String() + + // Repeat to reach desired size + repeats := (size / len(baseData)) + 1 + result := strings.Repeat(baseData, repeats) + + if len(result) > size { + return result[:size] + } + return result +} + +// FormatDuration formats a duration to a human-readable string. +func FormatDuration(duration time.Duration) string { + if duration < time.Second { + return fmt.Sprintf("%dms", duration.Milliseconds()) + } + if duration < time.Minute { + return fmt.Sprintf("%.1fs", duration.Seconds()) + } + return fmt.Sprintf("%.1fm", duration.Minutes()) +} + +// FormatThroughput calculates and formats throughput. +func FormatThroughput(count int64, duration time.Duration) string { + if duration == 0 { + return "N/A" + } + throughput := float64(count) / duration.Seconds() + return fmt.Sprintf("%.1f/s", throughput) +} + +// CreateGroup creates a property group with specified parameters. +func CreateGroup(ctx context.Context, groupClient databasev1.GroupRegistryServiceClient, replicaNum uint32) { + fmt.Printf("Creating group %s with %d replicas...\n", GroupName, replicaNum) + _, err := groupClient.Create(ctx, &databasev1.GroupRegistryServiceCreateRequest{ + Group: &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: GroupName, + }, + Catalog: commonv1.Catalog_CATALOG_PROPERTY, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + Replicas: replicaNum, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + +// UpdateGroupReplicas updates the replica number of an existing group. +func UpdateGroupReplicas(ctx context.Context, groupClient databasev1.GroupRegistryServiceClient, newReplicaNum uint32) { + fmt.Printf("Updating group %s to %d replicas...\n", GroupName, newReplicaNum) + _, err := groupClient.Update(ctx, &databasev1.GroupRegistryServiceUpdateRequest{ + Group: &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: GroupName, + }, + Catalog: commonv1.Catalog_CATALOG_PROPERTY, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + Replicas: newReplicaNum, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + +// CreatePropertySchema creates a property schema. +func CreatePropertySchema(ctx context.Context, propertyClient databasev1.PropertyRegistryServiceClient) { + fmt.Printf("Creating property schema %s...\n", PropertyName) + _, err := propertyClient.Create(ctx, &databasev1.PropertyRegistryServiceCreateRequest{ + Property: &databasev1.Property{ + Metadata: &commonv1.Metadata{ + Name: PropertyName, + Group: GroupName, + }, + Tags: []*databasev1.TagSpec{ + {Name: "data", Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: "timestamp", Type: databasev1.TagType_TAG_TYPE_STRING}, + }, + }, + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} + +// WriteProperties writes a batch of properties concurrently. +func WriteProperties(ctx context.Context, propertyServiceClient propertyv1.PropertyServiceClient, + startIdx int, endIdx int, +) error { + fmt.Printf("Starting to write %d-%d properties using %d goroutines...\n", + startIdx, endIdx, Concurrency) + + startTime := time.Now() + + // Channel to generate property data + dataChannel := make(chan int, 1000) // Buffer for property indices + var wg sync.WaitGroup + var totalProcessed int64 + + // Start data producer goroutine + go func() { + defer close(dataChannel) + for i := startIdx; i < endIdx; i++ { + dataChannel <- i + } + }() + + // Start consumer goroutines + for i := 0; i < Concurrency; i++ { + wg.Add(1) + go func(workerID int) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + var count int64 + + for propertyIndex := range dataChannel { + propertyID := fmt.Sprintf("property-%d", propertyIndex) + largeData := GenerateLargeData(DataSize) + timestamp := time.Now().Format(time.RFC3339Nano) + + _, writeErr := propertyServiceClient.Apply(ctx, &propertyv1.ApplyRequest{ + Property: &propertyv1.Property{ + Metadata: &commonv1.Metadata{ + Name: PropertyName, + Group: GroupName, + }, + Id: propertyID, + Tags: []*modelv1.Tag{ + {Key: "data", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: largeData}}}}, + {Key: "timestamp", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: timestamp}}}}, + }, + }, + }) + gomega.Expect(writeErr).NotTo(gomega.HaveOccurred()) + + count++ + atomic.AddInt64(&totalProcessed, 1) + + if atomic.LoadInt64(&totalProcessed)%500 == 0 { + elapsed := time.Since(startTime) + totalCount := atomic.LoadInt64(&totalProcessed) + fmt.Printf("total processed: %d, use: %v\n", totalCount, elapsed) + } + } + + fmt.Printf("Worker %d completed: processed %d properties total\n", workerID, count) + }(i) + } + + wg.Wait() + endTime := time.Now() + duration := endTime.Sub(startTime) + fmt.Printf("Write completed: %d properties in %s (%s props/sec)\n", + endIdx, FormatDuration(duration), FormatThroughput(int64(endIdx), duration)) + return nil +} + +// GetNodeMetrics fetches prometheus metrics from a single data node endpoint. +func GetNodeMetrics(endpoint string, nodeIndex int) *NodeMetrics { + nodeName := fmt.Sprintf("data-node-%d", nodeIndex+1) + metrics := &NodeMetrics{ + NodeName: nodeName, + LastScrapeTime: time.Now(), + IsHealthy: false, + } + + // Set timeout for HTTP request + client := &http.Client{ + Timeout: 10 * time.Second, + } + + resp, err := client.Get(endpoint) + if err != nil { + metrics.ErrorMessage = fmt.Sprintf("Failed to connect to %s: %v", endpoint, err) + return metrics + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + metrics.ErrorMessage = fmt.Sprintf("HTTP error %d from %s", resp.StatusCode, endpoint) + return metrics + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + metrics.ErrorMessage = fmt.Sprintf("Failed to read response from %s: %v", endpoint, err) + return metrics + } + + // Parse metrics from prometheus data + content := string(body) + totalPropagationCount := parseTotalPropagationCount(content) + repairSuccessCount := parseRepairSuccessCount(content) + + metrics.TotalPropagationCount = totalPropagationCount + metrics.RepairSuccessCount = repairSuccessCount + metrics.IsHealthy = true + return metrics +} + +// parseTotalPropagationCount parses the total_propagation_count from prometheus metrics text. +func parseTotalPropagationCount(content string) int64 { + // Look for metric lines like: banyandb_property_repair_gossip_server_total_propagation_count{group="perf-test-group",original_node="data-node-1:17912"} 3 + re := regexp.MustCompile(`banyandb_property_repair_gossip_server_total_propagation_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// parseRepairSuccessCount parses the repair_success_count from prometheus metrics text. +func parseRepairSuccessCount(content string) int64 { + // Look for metric lines like: banyandb_property_scheduler_property_repair_success_count{group="perf-test-group",shard="0"} 100 + re := regexp.MustCompile(`banyandb_property_scheduler_property_repair_success_count\{[^}]+\}\s+(\d+(?:\.\d+)?)`) + matches := re.FindAllStringSubmatch(content, -1) + + var totalCount int64 + for _, match := range matches { + if len(match) >= 2 { + value, err := strconv.ParseFloat(match[1], 64) + if err != nil { + continue + } + totalCount += int64(value) + } + } + + return totalCount +} + +// GetAllNodeMetrics fetches metrics from all data nodes concurrently. +func GetAllNodeMetrics() []*NodeMetrics { + var wg sync.WaitGroup + metrics := make([]*NodeMetrics, len(PrometheusEndpoints)) + + for i, endpoint := range PrometheusEndpoints { + wg.Add(1) + go func(index int, url string) { + defer wg.Done() + metrics[index] = GetNodeMetrics(url, index) + }(i, endpoint) + } + + wg.Wait() + return metrics +} + +// VerifyPropagationCountIncreased compares metrics before and after to verify total_propagation_count increased by exactly 1. +func VerifyPropagationCountIncreased(beforeMetrics, afterMetrics []*NodeMetrics) error { + if len(beforeMetrics) != len(afterMetrics) { + return fmt.Errorf("metrics array length mismatch: before=%d, after=%d", len(beforeMetrics), len(afterMetrics)) + } + + for i, after := range afterMetrics { + before := beforeMetrics[i] + + if !after.IsHealthy { + return fmt.Errorf("node %s is not healthy after update: %s", after.NodeName, after.ErrorMessage) + } + + if !before.IsHealthy { + return fmt.Errorf("node %s was not healthy before update: %s", before.NodeName, before.ErrorMessage) + } + + expectedCount := before.TotalPropagationCount + 1 + if after.TotalPropagationCount != expectedCount { + return fmt.Errorf("node %s propagation count mismatch: expected=%d, actual=%d (before=%d)", + after.NodeName, expectedCount, after.TotalPropagationCount, before.TotalPropagationCount) + } + } + + return nil +} + +// PrintMetricsComparison prints a comparison of metrics before and after. +func PrintMetricsComparison(beforeMetrics, afterMetrics []*NodeMetrics) { + fmt.Println("=== Prometheus Metrics Comparison ===") + fmt.Printf("%-12s | %-29s | %-29s | %-7s\n", "Node", "Propagation Count", "Repair Success Count", "Healthy") + fmt.Printf("%-12s | %-9s %-9s %-9s | %-9s %-9s %-9s | %-7s\n", "", "Before", "After", "Delta", "Before", "After", "Delta", "") + fmt.Println(strings.Repeat("-", 85)) + + for i, after := range afterMetrics { + if i < len(beforeMetrics) { + before := beforeMetrics[i] + propagationDelta := after.TotalPropagationCount - before.TotalPropagationCount + repairDelta := after.RepairSuccessCount - before.RepairSuccessCount + healthStatus := "✓" + if !after.IsHealthy { + healthStatus = "✗" + } + + fmt.Printf("%-12s | %-9d %-9d %-9d | %-9d %-9d %-9d | %-7s\n", + after.NodeName, + before.TotalPropagationCount, after.TotalPropagationCount, propagationDelta, + before.RepairSuccessCount, after.RepairSuccessCount, repairDelta, + healthStatus) + } + } + fmt.Println() +} + +// ExecuteComposeCommand executes a docker-compose command, supporting both v1 and v2. +func ExecuteComposeCommand(args ...string) error { + // v2 + if _, err := exec.LookPath("docker"); err == nil { + check := exec.Command("docker", "compose", "version") + if out, err := check.CombinedOutput(); err == nil && strings.Contains(string(out), "Docker Compose") { + composeArgs := append([]string{"compose"}, args...) + cmd := exec.Command("docker", composeArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() + } + } + + // v1 + if _, err := exec.LookPath("docker-compose"); err == nil { + cmd := exec.Command("docker-compose", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() + } + + return nil +}