hanahmily commented on code in PR #686: URL: https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2165226894
########## banyand/liaison/grpc/property.go: ########## @@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil } +func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity string, p *propertyWithCount, groups map[string]*commonv1.Group) error { + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return errors.New("failed to get group copies") + } + if copies == uint32(len(p.existNodes)) { + return nil + } + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) + if err != nil { + return err + } + // building the repair data + repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)} + if p.deletedTime > 0 { // building the delete request + repairReq.Operation = &propertyv1.InternalRepairRequest_Delete{ Review Comment: If the deletion's version is less than the latest version on the data node, the deletion should be dropped. I recommend using a unique repair request that contains a "deleted" field. ########## banyand/liaison/grpc/property.go: ########## @@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil } +func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity string, p *propertyWithCount, groups map[string]*commonv1.Group) error { + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return errors.New("failed to get group copies") + } + if copies == uint32(len(p.existNodes)) { + return nil + } + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) + if err != nil { + return err + } + // building the repair data + repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)} Review Comment: repairReq would be initialized when there are nodes that need to be repaired. ########## banyand/liaison/grpc/property.go: ########## @@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, nil } +func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity string, p *propertyWithCount, groups map[string]*commonv1.Group) error { + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return errors.New("failed to get group copies") + } + if copies == uint32(len(p.existNodes)) { + return nil + } + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) Review Comment: ```suggestion shardID, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) ``` ########## banyand/property/db.go: ########## @@ -219,6 +221,77 @@ func (db *database) collect() { } } +func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, property *propertyv1.InternalApplyProperty) error { + s, err := db.loadShard(ctx, common.ShardID(id)) + if err != nil { + return errors.WithMessagef(err, "failed to load shard %d", id) + } + olderProperties, err := db.query(ctx, &propertyv1.QueryRequest{ + Groups: []string{property.Property.Metadata.Group}, + Name: property.Property.Metadata.Name, + Ids: []string{property.Property.Id}, + }) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + docIDList := make([][]byte, 0, len(olderProperties)) + for _, p := range olderProperties { + if p.deleteTime > 0 { + // If the property is already deleted, ignore it. + continue + } + docIDList = append(docIDList, p.id) Review Comment: Then, keep the latest property ( whether it has been deleted or not), and update all expired properties to be deleted. ########## banyand/property/db.go: ########## @@ -219,6 +221,77 @@ func (db *database) collect() { } } +func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, property *propertyv1.InternalApplyProperty) error { Review Comment: ```suggestion func (db *database) repairFromApplyProperty(ctx context.Context, shardID uint64, property *propertyv1.InternalApplyProperty) error { ``` ########## banyand/property/db.go: ########## @@ -219,6 +221,77 @@ func (db *database) collect() { } } +func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, property *propertyv1.InternalApplyProperty) error { + s, err := db.loadShard(ctx, common.ShardID(id)) + if err != nil { + return errors.WithMessagef(err, "failed to load shard %d", id) + } + olderProperties, err := db.query(ctx, &propertyv1.QueryRequest{ + Groups: []string{property.Property.Metadata.Group}, + Name: property.Property.Metadata.Name, + Ids: []string{property.Property.Id}, + }) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + docIDList := make([][]byte, 0, len(olderProperties)) + for _, p := range olderProperties { + if p.deleteTime > 0 { + // If the property is already deleted, ignore it. + continue + } + docIDList = append(docIDList, p.id) Review Comment: If the property's version is older than the latest version in olderProperties, the repair should be stopped. ########## banyand/liaison/grpc/property.go: ########## @@ -504,3 +575,23 @@ type propertyWithMetadata struct { *propertyv1.Property deletedTime int64 } + +type propertyWithCount struct { + *propertyWithMetadata + existNodes map[string]bool +} + +func newPropertyWithCounts(p *propertyWithMetadata, existNode string) *propertyWithCount { + res := &propertyWithCount{ + propertyWithMetadata: p, + } + res.AddExistNode(existNode) + return res +} + +func (p *propertyWithCount) AddExistNode(node string) { Review Comment: ```suggestion func (p *propertyWithCount) addExistNode(node string) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org