hanahmily commented on code in PR #686:
URL: 
https://github.com/apache/skywalking-banyandb/pull/686#discussion_r2166641898


##########
banyand/property/shard.go:
##########
@@ -244,13 +248,102 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
                }
                data = append(data, &queryProperty{
                        id:         s.Key.EntityValues,
+                       timestamp:  s.Timestamp,
                        source:     bytes,
                        deleteTime: deleteTime,
                })
        }
        return data, nil
 }
 
+func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) error {
+       iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+               Groups: []string{property.Metadata.Group},
+               Name:   property.Metadata.Name,
+               Ids:    []string{property.Id},
+       }, groupField, entityID)
+       if err != nil {
+               return fmt.Errorf("build property query failure: %w", err)
+       }
+       olderProperties, err := s.search(ctx, iq, 100)
+       if err != nil {
+               return fmt.Errorf("query older properties failed: %w", err)
+       }
+       sort.Sort(queryPropertySlice(olderProperties))
+       // if there no older properties, we can insert the latest document.
+       if len(olderProperties) == 0 {
+               var doc *index.Document
+               doc, err = s.buildUpdateDocument(id, property)
+               if err != nil {
+                       return fmt.Errorf("build update document failed: %w", 
err)
+               }
+               doc.DeletedTime = deleteTime
+               return s.updateDocuments(index.Documents{*doc})
+       }
+
+       // if the lastest property in shard is bigger(or equals) than the 
repaired property,
+       // then the repaired process should be stopped.
+       // only deleted the older properties(exclude lastest one).
+       if olderProperties[len(olderProperties)-1].timestamp >= 
property.Metadata.ModRevision {
+               var documents []index.Document
+               // if the latest property mod time is equals to the repaired 
property mod time,
+               // but the delete time is different, then we need to update the 
latest property
+               if olderProperties[len(olderProperties)-1].timestamp == 
property.Metadata.ModRevision &&
+                       olderProperties[len(olderProperties)-1].deleteTime != 
deleteTime {
+                       var updateSelfDoc *index.Document
+                       updateSelfDoc, err = s.buildUpdateDocument(id, property)
+                       if err != nil {
+                               return fmt.Errorf("build update self document 
failed: %w", err)
+                       }
+                       updateSelfDoc.DeletedTime = deleteTime
+                       documents = append(documents, *updateSelfDoc)
+               }
+               if len(olderProperties) > 1 {
+                       docIDList := 
s.buildNotDeletedDocIDList(olderProperties[0 : len(olderProperties)-1])
+                       var deletedDocuments []index.Document
+                       deletedDocuments, err = 
s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano())
+                       if err != nil {
+                               return fmt.Errorf("build delete documents 
failed: %w", err)
+                       }
+                       documents = append(documents, deletedDocuments...)
+               }
+               return s.updateDocuments(documents)

Review Comment:
   ```suggestion
                return nil
   ```
   
   In this branch, the data node has a newer version than liaison. I recommand 
that you stop trying to fix it in this round because you can not repair based 
on the expired property.



##########
banyand/property/shard.go:
##########
@@ -244,13 +248,102 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
                }
                data = append(data, &queryProperty{
                        id:         s.Key.EntityValues,
+                       timestamp:  s.Timestamp,
                        source:     bytes,
                        deleteTime: deleteTime,
                })
        }
        return data, nil
 }
 
+func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) error {
+       iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+               Groups: []string{property.Metadata.Group},
+               Name:   property.Metadata.Name,
+               Ids:    []string{property.Id},
+       }, groupField, entityID)
+       if err != nil {
+               return fmt.Errorf("build property query failure: %w", err)
+       }
+       olderProperties, err := s.search(ctx, iq, 100)
+       if err != nil {
+               return fmt.Errorf("query older properties failed: %w", err)
+       }
+       sort.Sort(queryPropertySlice(olderProperties))
+       // if there no older properties, we can insert the latest document.
+       if len(olderProperties) == 0 {
+               var doc *index.Document
+               doc, err = s.buildUpdateDocument(id, property)
+               if err != nil {
+                       return fmt.Errorf("build update document failed: %w", 
err)
+               }
+               doc.DeletedTime = deleteTime
+               return s.updateDocuments(index.Documents{*doc})
+       }
+
+       // if the lastest property in shard is bigger(or equals) than the 
repaired property,
+       // then the repaired process should be stopped.
+       // only deleted the older properties(exclude lastest one).

Review Comment:
   ```suggestion
   ```
   
   



##########
banyand/property/shard.go:
##########
@@ -233,13 +247,103 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
                        deleteTime = convert.BytesToInt64(s.Fields[deleteField])
                }
                data = append(data, &queryProperty{
+                       id:         s.Key.EntityValues,
+                       timestamp:  s.Timestamp,
                        source:     bytes,
                        deleteTime: deleteTime,
                })
        }
        return data, nil
 }
 
+func (s *shard) repair(ctx context.Context, id []byte, property 
*propertyv1.Property, deleteTime int64) error {
+       iq, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+               Groups: []string{property.Metadata.Group},
+               Name:   property.Metadata.Name,
+               Ids:    []string{property.Id},
+       }, groupField, entityID)
+       if err != nil {
+               return fmt.Errorf("build property query failure: %w", err)
+       }
+       olderProperties, err := s.search(ctx, iq, 100)
+       if err != nil {
+               return fmt.Errorf("query older properties failed: %w", err)
+       }
+       sort.Sort(queryPropertySlice(olderProperties))
+       // if there no older properties, we can insert the latest document.
+       if len(olderProperties) == 0 {
+               var doc *index.Document
+               doc, err = s.buildUpdateDocument(id, property)
+               if err != nil {
+                       return fmt.Errorf("build update document failed: %w", 
err)
+               }
+               doc.DeletedTime = deleteTime
+               return s.updateDocuments(index.Documents{*doc})
+       }
+
+       // if the lastest property in shard is bigger(or equals) than the 
repaired property,
+       // then the repaired process should be stopped.
+       // only deleted the older properties(exclude lastest one).
+       if olderProperties[len(olderProperties)-1].timestamp >= 
property.Metadata.ModRevision {
+               var documents []index.Document
+               // if the latest property mod time is equals to the repaired 
property mod time,
+               // but the delete time is different, then we need to update the 
latest property
+               if olderProperties[len(olderProperties)-1].timestamp == 
property.Metadata.ModRevision &&
+                       olderProperties[len(olderProperties)-1].deleteTime != 
deleteTime {
+                       var updateSelfDoc *index.Document
+                       updateSelfDoc, err = s.buildUpdateDocument(id, property)
+                       if err != nil {
+                               return fmt.Errorf("build update self document 
failed: %w", err)
+                       }
+                       updateSelfDoc.DeletedTime = deleteTime
+                       documents = append(documents, *updateSelfDoc)
+               }
+               if len(olderProperties) > 1 {
+                       docIDList := 
s.buildNotDeletedDocIDList(olderProperties[0 : len(olderProperties)-1])
+                       var deletedDocuments []index.Document
+                       deletedDocuments, err = 
s.buildDeleteFromTimeDocuments(ctx, docIDList, time.Now().UnixNano())
+                       if err != nil {
+                               return fmt.Errorf("build delete documents 
failed: %w", err)
+                       }
+                       documents = append(documents, deletedDocuments...)
+               }
+               return s.updateDocuments(documents)
+       }
+
+       docIDList := s.buildNotDeletedDocIDList(olderProperties)
+       deletedDocuments, err := s.buildDeleteFromTimeDocuments(ctx, docIDList, 
time.Now().UnixNano())
+       if err != nil {
+               return fmt.Errorf("build delete older documents failed: %w", 
err)
+       }
+       // update the property to mark it as delete
+       updateDoc, err := s.buildUpdateDocument(id, property)
+       if err != nil {
+               return fmt.Errorf("build repair document failure: %w", err)
+       }
+       // set the delete time(could be zero) to the latest delete time
+       updateDoc.DeletedTime = deleteTime

Review Comment:
   Please remove the deletedTime from the Document. All fields related to the 
property should be defined in the property package, like "_source". I'm sorry 
for not bringing this issue to your attention in the previous PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to