hanahmily commented on code in PR #686: URL: https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2161885399
########## banyand/liaison/grpc/property.go: ########## @@ -357,34 +357,76 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques req.Limit = 100 } - nodeProperties, trace, err := ps.queryProperties(ctx, req) + nodeProperties, groups, trace, err := ps.queryProperties(ctx, req) if err != nil { return nil, err } - res := make(map[string]*propertyWithMetadata) - for _, nodeWithProperties := range nodeProperties { + res := make(map[string]*propertyWithCount) + shouldDeleteOlderProperties := make([][]byte, 0) + for n, nodeWithProperties := range nodeProperties { for _, propertyMetadata := range nodeWithProperties { entity := propertypkg.GetEntity(propertyMetadata.Property) cur, ok := res[entity] if !ok { - res[entity] = propertyMetadata + res[entity] = newPropertyWithCounts(propertyMetadata, n) continue } - if cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision { - res[entity] = propertyMetadata - // TODO(mrproliu) handle the case where the property detected multiple versions + switch { + case cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision: // newer revision + res[entity] = newPropertyWithCounts(propertyMetadata, n) + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property)) + case cur.Metadata.ModRevision == propertyMetadata.Metadata.ModRevision: // same revision + cur.AddExistNode(n) + default: // cur.Metadata.ModRevision > propertyMetadata.Metadata.ModRevision + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(propertyMetadata.Property)) } } } + if len(shouldDeleteOlderProperties) > 0 { + if err := ps.remove(shouldDeleteOlderProperties); err != nil { + ps.log.Warn().Msgf("failed to delete old properties when query: %v", err) + } + } if len(res) == 0 { return &propertyv1.QueryResponse{Properties: nil, Trace: trace}, nil } properties := make([]*propertyv1.Property, 0, len(res)) - for _, p := range res { + for entity, p := range res { // ignore deletedTime property if p.deletedTime > 0 { continue } + + // make sure have the enough replicas + group := groups[p.Metadata.Group] + if group == nil { + return nil, errors.Errorf("group %s not found", p.Metadata.Group) + } + copies, ok := ps.groupRepo.copies(p.Metadata.GetGroup()) + if !ok { + return nil, errors.New("failed to get group copies") + } + if copies != uint32(len(p.existNodes)) { + id, err := partition.ShardID(convert.StringToBytes(entity), group.ResourceOpts.ShardNum) + if err != nil { + return nil, err + } + nodes := make([]string, 0, copies) + for i := range copies { Review Comment: You should repair the node with expired data, not all nodes. Your current solution will introduce more traffic. ########## banyand/liaison/grpc/property.go: ########## @@ -357,34 +357,76 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques req.Limit = 100 } - nodeProperties, trace, err := ps.queryProperties(ctx, req) + nodeProperties, groups, trace, err := ps.queryProperties(ctx, req) if err != nil { return nil, err } - res := make(map[string]*propertyWithMetadata) - for _, nodeWithProperties := range nodeProperties { + res := make(map[string]*propertyWithCount) + shouldDeleteOlderProperties := make([][]byte, 0) + for n, nodeWithProperties := range nodeProperties { for _, propertyMetadata := range nodeWithProperties { entity := propertypkg.GetEntity(propertyMetadata.Property) cur, ok := res[entity] if !ok { - res[entity] = propertyMetadata + res[entity] = newPropertyWithCounts(propertyMetadata, n) continue } - if cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision { - res[entity] = propertyMetadata - // TODO(mrproliu) handle the case where the property detected multiple versions + switch { + case cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision: // newer revision + res[entity] = newPropertyWithCounts(propertyMetadata, n) + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property)) + case cur.Metadata.ModRevision == propertyMetadata.Metadata.ModRevision: // same revision + cur.AddExistNode(n) + default: // cur.Metadata.ModRevision > propertyMetadata.Metadata.ModRevision + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(propertyMetadata.Property)) } } } + if len(shouldDeleteOlderProperties) > 0 { + if err := ps.remove(shouldDeleteOlderProperties); err != nil { Review Comment: To enhance performance, implement a repair operation in the data nodes. This operation receives the latest data, deletes all expired data, and then applies the latest one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org