hanahmily commented on code in PR #686:
URL: 
https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2165226894


##########
banyand/liaison/grpc/property.go:
##########
@@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, 
nil
 }
 
+func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity 
string, p *propertyWithCount, groups map[string]*commonv1.Group) error {
+       // make sure have the enough replicas
+       group := groups[p.Metadata.Group]
+       if group == nil {
+               return errors.Errorf("group %s not found", p.Metadata.Group)
+       }
+       copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup())
+       if !ok {
+               return errors.New("failed to get group copies")
+       }
+       if copies == uint32(len(p.existNodes)) {
+               return nil
+       }
+       id, err := partition.ShardID(convert.StringToBytes(entity), 
group.ResourceOpts.ShardNum)
+       if err != nil {
+               return err
+       }
+       // building the repair data
+       repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)}
+       if p.deletedTime > 0 { // building the delete request
+               repairReq.Operation = &propertyv1.InternalRepairRequest_Delete{

Review Comment:
   If the deletion's version is less than the latest version on the data node, 
the deletion should be dropped. I recommend using a unique repair request that 
contains a "deleted" field.



##########
banyand/liaison/grpc/property.go:
##########
@@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, 
nil
 }
 
+func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity 
string, p *propertyWithCount, groups map[string]*commonv1.Group) error {
+       // make sure have the enough replicas
+       group := groups[p.Metadata.Group]
+       if group == nil {
+               return errors.Errorf("group %s not found", p.Metadata.Group)
+       }
+       copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup())
+       if !ok {
+               return errors.New("failed to get group copies")
+       }
+       if copies == uint32(len(p.existNodes)) {
+               return nil
+       }
+       id, err := partition.ShardID(convert.StringToBytes(entity), 
group.ResourceOpts.ShardNum)
+       if err != nil {
+               return err
+       }
+       // building the repair data
+       repairReq := &propertyv1.InternalRepairRequest{ShardId: uint64(id)}

Review Comment:
   repairReq would be initialized when there are nodes that need to be repaired.



##########
banyand/liaison/grpc/property.go:
##########
@@ -405,11 +411,73 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, 
nil
 }
 
+func (ps *propertyServer) repairPropertyIfNeed(ctx context.Context, entity 
string, p *propertyWithCount, groups map[string]*commonv1.Group) error {
+       // make sure have the enough replicas
+       group := groups[p.Metadata.Group]
+       if group == nil {
+               return errors.Errorf("group %s not found", p.Metadata.Group)
+       }
+       copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup())
+       if !ok {
+               return errors.New("failed to get group copies")
+       }
+       if copies == uint32(len(p.existNodes)) {
+               return nil
+       }
+       id, err := partition.ShardID(convert.StringToBytes(entity), 
group.ResourceOpts.ShardNum)

Review Comment:
   ```suggestion
        shardID, err := partition.ShardID(convert.StringToBytes(entity), 
group.ResourceOpts.ShardNum)
   ```



##########
banyand/property/db.go:
##########
@@ -219,6 +221,77 @@ func (db *database) collect() {
        }
 }
 
+func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, 
property *propertyv1.InternalApplyProperty) error {
+       s, err := db.loadShard(ctx, common.ShardID(id))
+       if err != nil {
+               return errors.WithMessagef(err, "failed to load shard %d", id)
+       }
+       olderProperties, err := db.query(ctx, &propertyv1.QueryRequest{
+               Groups: []string{property.Property.Metadata.Group},
+               Name:   property.Property.Metadata.Name,
+               Ids:    []string{property.Property.Id},
+       })
+       if err != nil {
+               return fmt.Errorf("query older properties failed: %w", err)
+       }
+       docIDList := make([][]byte, 0, len(olderProperties))
+       for _, p := range olderProperties {
+               if p.deleteTime > 0 {
+                       // If the property is already deleted, ignore it.
+                       continue
+               }
+               docIDList = append(docIDList, p.id)

Review Comment:
   Then, keep the latest property ( whether it has been deleted or not), and 
update all expired properties to be deleted.



##########
banyand/property/db.go:
##########
@@ -219,6 +221,77 @@ func (db *database) collect() {
        }
 }
 
+func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, 
property *propertyv1.InternalApplyProperty) error {

Review Comment:
   ```suggestion
   func (db *database) repairFromApplyProperty(ctx context.Context, shardID 
uint64, property *propertyv1.InternalApplyProperty) error {
   ```



##########
banyand/property/db.go:
##########
@@ -219,6 +221,77 @@ func (db *database) collect() {
        }
 }
 
+func (db *database) repairFromApplyProperty(ctx context.Context, id uint64, 
property *propertyv1.InternalApplyProperty) error {
+       s, err := db.loadShard(ctx, common.ShardID(id))
+       if err != nil {
+               return errors.WithMessagef(err, "failed to load shard %d", id)
+       }
+       olderProperties, err := db.query(ctx, &propertyv1.QueryRequest{
+               Groups: []string{property.Property.Metadata.Group},
+               Name:   property.Property.Metadata.Name,
+               Ids:    []string{property.Property.Id},
+       })
+       if err != nil {
+               return fmt.Errorf("query older properties failed: %w", err)
+       }
+       docIDList := make([][]byte, 0, len(olderProperties))
+       for _, p := range olderProperties {
+               if p.deleteTime > 0 {
+                       // If the property is already deleted, ignore it.
+                       continue
+               }
+               docIDList = append(docIDList, p.id)

Review Comment:
   If the property's version is older than the latest version in 
olderProperties, the repair should be stopped.



##########
banyand/liaison/grpc/property.go:
##########
@@ -504,3 +575,23 @@ type propertyWithMetadata struct {
        *propertyv1.Property
        deletedTime int64
 }
+
+type propertyWithCount struct {
+       *propertyWithMetadata
+       existNodes map[string]bool
+}
+
+func newPropertyWithCounts(p *propertyWithMetadata, existNode string) 
*propertyWithCount {
+       res := &propertyWithCount{
+               propertyWithMetadata: p,
+       }
+       res.AddExistNode(existNode)
+       return res
+}
+
+func (p *propertyWithCount) AddExistNode(node string) {

Review Comment:
   ```suggestion
   func (p *propertyWithCount) addExistNode(node string) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to