hanahmily commented on code in PR #686: URL: https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2166641898
########## banyand/property/shard.go: ########## @@ -244,13 +248,102 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } data = append(data, &queryProperty{ id: s.Key.EntityValues, + timestamp: s.Timestamp, source: bytes, deleteTime: deleteTime, }) } return data, nil } +func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) error { + iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{property.Metadata.Group}, + Name: property.Metadata.Name, + Ids: []string{property.Id}, + }, groupField, entityID) + if err != nil { + return fmt.Errorf("build property query failure: %w", err) + } + olderProperties, err := s.search(ctx, iq, 100) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + sort.Sort(queryPropertySlice(olderProperties)) + // if there no older properties, we can insert the latest document. + if len(olderProperties) == 0 { + var doc *index.Document + doc, err = s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update document failed: %w", err) + } + doc.DeletedTime = deleteTime + return s.updateDocuments(index.Documents{*doc}) + } + + // if the lastest property in shard is bigger(or equals) than the repaired property, + // then the repaired process should be stopped. + // only deleted the older properties(exclude lastest one). + if olderProperties[len(olderProperties)-1].timestamp >= property.Metadata.ModRevision { + var documents []index.Document + // if the latest property mod time is equals to the repaired property mod time, + // but the delete time is different, then we need to update the latest property + if olderProperties[len(olderProperties)-1].timestamp == property.Metadata.ModRevision && + olderProperties[len(olderProperties)-1].deleteTime != deleteTime { + var updateSelfDoc *index.Document + updateSelfDoc, err = s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update self document failed: %w", err) + } + updateSelfDoc.DeletedTime = deleteTime + documents = append(documents, *updateSelfDoc) + } + if len(olderProperties) > 1 { + docIDList := s.buildNotDeletedDocIDList(olderProperties[0 : len(olderProperties)-1]) + var deletedDocuments []index.Document + deletedDocuments, err = s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano()) + if err != nil { + return fmt.Errorf("build delete documents failed: %w", err) + } + documents = append(documents, deletedDocuments...) + } + return s.updateDocuments(documents) Review Comment: ```suggestion return nil ``` In this branch, the data node has a newer version than liaison. I recommand that you stop trying to fix it in this round because you can not repair based on the expired property. ########## banyand/property/shard.go: ########## @@ -244,13 +248,102 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } data = append(data, &queryProperty{ id: s.Key.EntityValues, + timestamp: s.Timestamp, source: bytes, deleteTime: deleteTime, }) } return data, nil } +func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) error { + iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{property.Metadata.Group}, + Name: property.Metadata.Name, + Ids: []string{property.Id}, + }, groupField, entityID) + if err != nil { + return fmt.Errorf("build property query failure: %w", err) + } + olderProperties, err := s.search(ctx, iq, 100) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + sort.Sort(queryPropertySlice(olderProperties)) + // if there no older properties, we can insert the latest document. + if len(olderProperties) == 0 { + var doc *index.Document + doc, err = s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update document failed: %w", err) + } + doc.DeletedTime = deleteTime + return s.updateDocuments(index.Documents{*doc}) + } + + // if the lastest property in shard is bigger(or equals) than the repaired property, + // then the repaired process should be stopped. + // only deleted the older properties(exclude lastest one). Review Comment: ```suggestion ``` ########## banyand/property/shard.go: ########## @@ -233,13 +247,103 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, deleteTime = convert.BytesToInt64(s.Fields[deleteField]) } data = append(data, &queryProperty{ + id: s.Key.EntityValues, + timestamp: s.Timestamp, source: bytes, deleteTime: deleteTime, }) } return data, nil } +func (s *shard) repair(ctx context.Context, id []byte, property *propertyv1.Property, deleteTime int64) error { + iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{property.Metadata.Group}, + Name: property.Metadata.Name, + Ids: []string{property.Id}, + }, groupField, entityID) + if err != nil { + return fmt.Errorf("build property query failure: %w", err) + } + olderProperties, err := s.search(ctx, iq, 100) + if err != nil { + return fmt.Errorf("query older properties failed: %w", err) + } + sort.Sort(queryPropertySlice(olderProperties)) + // if there no older properties, we can insert the latest document. + if len(olderProperties) == 0 { + var doc *index.Document + doc, err = s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update document failed: %w", err) + } + doc.DeletedTime = deleteTime + return s.updateDocuments(index.Documents{*doc}) + } + + // if the lastest property in shard is bigger(or equals) than the repaired property, + // then the repaired process should be stopped. + // only deleted the older properties(exclude lastest one). + if olderProperties[len(olderProperties)-1].timestamp >= property.Metadata.ModRevision { + var documents []index.Document + // if the latest property mod time is equals to the repaired property mod time, + // but the delete time is different, then we need to update the latest property + if olderProperties[len(olderProperties)-1].timestamp == property.Metadata.ModRevision && + olderProperties[len(olderProperties)-1].deleteTime != deleteTime { + var updateSelfDoc *index.Document + updateSelfDoc, err = s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build update self document failed: %w", err) + } + updateSelfDoc.DeletedTime = deleteTime + documents = append(documents, *updateSelfDoc) + } + if len(olderProperties) > 1 { + docIDList := s.buildNotDeletedDocIDList(olderProperties[0 : len(olderProperties)-1]) + var deletedDocuments []index.Document + deletedDocuments, err = s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano()) + if err != nil { + return fmt.Errorf("build delete documents failed: %w", err) + } + documents = append(documents, deletedDocuments...) + } + return s.updateDocuments(documents) + } + + docIDList := s.buildNotDeletedDocIDList(olderProperties) + deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano()) + if err != nil { + return fmt.Errorf("build delete older documents failed: %w", err) + } + // update the property to mark it as delete + updateDoc, err := s.buildUpdateDocument(id, property) + if err != nil { + return fmt.Errorf("build repair document failure: %w", err) + } + // set the delete time(could be zero) to the latest delete time + updateDoc.DeletedTime = deleteTime Review Comment: Please remove the deletedTime from the Document. All fields related to the property should be defined in the property package, like "_source". I'm sorry for not bringing this issue to your attention in the previous PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org