mrproliu commented on code in PR #686: URL: https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2165283490
########## banyand/property/db.go: ########## @@ -219,6 +221,77 @@ func (db *database) collect() { } } +func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, property *propertyv1.InternalApplyProperty) error { + s, err := db.loadShard(ctx, common.ShardID(id)) + if err != nil { + return errors.WithMessagef(err, "failed to load shard %d", id) + } + olderProperties, err := db.query(ctx, &propertyv1.QueryRequest{ + Groups: []string{property.Property.Metadata.Group}, + Name: property.Property.Metadata.Name, + Ids: []string{property.Property.Id}, + }) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + docIDList := make([][]byte, 0, len(olderProperties)) + for _, p := range olderProperties { + if p.deleteTime > 0 { + // If the property is already deleted, ignore it. + continue + } + docIDList = append(docIDList, p.id) Review Comment: You mean the liaison node may request an older version to repair? ########## banyand/liaison/grpc/property.go: ########## @@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil } +func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity string, p *propertyWithCount, groups map[string]*commonv1.Group) error { + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return errors.New("failed to get group copies") + } + if copies == uint32(len(p.existNodes)) { + return nil + } + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) + if err != nil { + return err + } + // building the repair data + repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)} Review Comment: Yeah. I have checked the latest version replicas count is the same as the group replicas count: https://github.com/apache/skywalking-banyandb/pull/686/files#diff-ec9aafdd27b3b36de2fae1ed134371dd730232bbd344d77197eebd1b0927221eR424 If they are not the same, it will only send the repair request to the non-sync nodes. All the nodes are using the same request. ########## banyand/liaison/grpc/property.go: ########## @@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil } +func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity string, p *propertyWithCount, groups map[string]*commonv1.Group) error { + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return errors.New("failed to get group copies") + } + if copies == uint32(len(p.existNodes)) { + return nil + } + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) + if err != nil { + return err + } + // building the repair data + repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)} + if p.deletedTime > 0 { // building the delete request + repairReq.Operation = &propertyv1.InternalRepairRequest_Delete{ Review Comment: The property in the liaison node will only use the latest version: https://github.com/apache/skywalking-banyandb/pull/686/files#diff-ec9aafdd27b3b36de2fae1ed134371dd730232bbd344d77197eebd1b0927221eR375 So, if the deletion's version is less than the latest version, it will still be an applied request, not a deleted request. When the applied request is sent to the data node, the data will be deleted and all older versions, and apply the new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org