hanahmily commented on code in PR #686:
URL: 
https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2161885399


##########
banyand/liaison/grpc/property.go:
##########
@@ -357,34 +357,76 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                req.Limit = 100
        }
 
-       nodeProperties, trace, err := ps.queryProperties(ctx, req)
+       nodeProperties, groups, trace, err := ps.queryProperties(ctx, req)
        if err != nil {
                return nil, err
        }
-       res := make(map[string]*propertyWithMetadata)
-       for _, nodeWithProperties := range nodeProperties {
+       res := make(map[string]*propertyWithCount)
+       shouldDeleteOlderProperties := make([][]byte, 0)
+       for n, nodeWithProperties := range nodeProperties {
                for _, propertyMetadata := range nodeWithProperties {
                        entity := 
propertypkg.GetEntity(propertyMetadata.Property)
                        cur, ok := res[entity]
                        if !ok {
-                               res[entity] = propertyMetadata
+                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
                                continue
                        }
-                       if cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision {
-                               res[entity] = propertyMetadata
-                               // TODO(mrproliu) handle the case where the 
property detected multiple versions
+                       switch {
+                       case cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision: // newer revision
+                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property))
+                       case cur.Metadata.ModRevision == 
propertyMetadata.Metadata.ModRevision: // same revision
+                               cur.AddExistNode(n)
+                       default: // cur.Metadata.ModRevision > 
propertyMetadata.Metadata.ModRevision
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, 
propertypkg.GetPropertyID(propertyMetadata.Property))
                        }
                }
        }
+       if len(shouldDeleteOlderProperties) > 0 {
+               if err := ps.remove(shouldDeleteOlderProperties); err != nil {
+                       ps.log.Warn().Msgf("failed to delete old properties 
when query: %v", err)
+               }
+       }
        if len(res) == 0 {
                return &propertyv1.QueryResponse{Properties: nil, Trace: 
trace}, nil
        }
        properties := make([]*propertyv1.Property, 0, len(res))
-       for _, p := range res {
+       for entity, p := range res {
                // ignore deletedTime property
                if p.deletedTime > 0 {
                        continue
                }
+
+               // make sure have the enough replicas
+               group := groups[p.Metadata.Group]
+               if group == nil {
+                       return nil, errors.Errorf("group %s not found", 
p.Metadata.Group)
+               }
+               copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup())
+               if !ok {
+                       return nil, errors.New("failed to get group copies")
+               }
+               if copies != uint32(len(p.existNodes)) {
+                       id, err := 
partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum)
+                       if err != nil {
+                               return nil, err
+                       }
+                       nodes := make([]string, 0, copies)
+                       for i := range copies {

Review Comment:
   You should repair the node with expired data, not all nodes. Your current 
solution will introduce more traffic.  



##########
banyand/liaison/grpc/property.go:
##########
@@ -357,34 +357,76 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                req.Limit = 100
        }
 
-       nodeProperties, trace, err := ps.queryProperties(ctx, req)
+       nodeProperties, groups, trace, err := ps.queryProperties(ctx, req)
        if err != nil {
                return nil, err
        }
-       res := make(map[string]*propertyWithMetadata)
-       for _, nodeWithProperties := range nodeProperties {
+       res := make(map[string]*propertyWithCount)
+       shouldDeleteOlderProperties := make([][]byte, 0)
+       for n, nodeWithProperties := range nodeProperties {
                for _, propertyMetadata := range nodeWithProperties {
                        entity := 
propertypkg.GetEntity(propertyMetadata.Property)
                        cur, ok := res[entity]
                        if !ok {
-                               res[entity] = propertyMetadata
+                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
                                continue
                        }
-                       if cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision {
-                               res[entity] = propertyMetadata
-                               // TODO(mrproliu) handle the case where the 
property detected multiple versions
+                       switch {
+                       case cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision: // newer revision
+                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property))
+                       case cur.Metadata.ModRevision == 
propertyMetadata.Metadata.ModRevision: // same revision
+                               cur.AddExistNode(n)
+                       default: // cur.Metadata.ModRevision > 
propertyMetadata.Metadata.ModRevision
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, 
propertypkg.GetPropertyID(propertyMetadata.Property))
                        }
                }
        }
+       if len(shouldDeleteOlderProperties) > 0 {
+               if err := ps.remove(shouldDeleteOlderProperties); err != nil {

Review Comment:
   To enhance performance, implement a repair operation in the data nodes. This 
operation receives the latest data, deletes all expired data, and then applies 
the latest one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to