hanahmily commented on code in PR #861:
URL: 
https://github.com/apache/skywalking-banyandb/pull/861#discussion_r2589450013


##########
pkg/index/inverted/query.go:
##########
@@ -677,6 +677,7 @@ func BuildPropertyQuery(req *propertyv1.QueryRequest, 
groupField, idField string
                bq.AddMust(iq)
                bn.Append(in)
        }
+

Review Comment:
   ```suggestion
   ```



##########
banyand/liaison/grpc/property.go:
##########
@@ -439,54 +442,237 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        if err != nil {
                return nil, err
        }
-       res := make(map[string]*propertyWithCount)
-       for n, nodeWithProperties := range nodeProperties {
-               for _, propertyMetadata := range nodeWithProperties {
-                       entity := 
propertypkg.GetEntity(propertyMetadata.Property)
-                       cur, ok := res[entity]
-                       if !ok {
-                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
-                               continue
-                       }
-                       switch {
-                       case cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision: // newer revision
-                               res[entity] = 
newPropertyWithCounts(propertyMetadata, n)
-                       case cur.Metadata.ModRevision == 
propertyMetadata.Metadata.ModRevision: // same revision
-                               cur.addExistNode(n)
-                       }
-               }
-       }
-       if len(res) == 0 {
+       if len(nodeProperties) == 0 {
                return &propertyv1.QueryResponse{Properties: nil, Trace: 
trace}, nil
        }
-       properties := make([]*propertyv1.Property, 0, len(res))
-       for entity, p := range res {
-               if err := ps.repairPropertyIfNeed(entity, p, groups); err != 
nil {
+
+       var properties []*propertyWithCount
+
+       // Choose processing path based on whether ordering is requested
+       if req.OrderBy != nil && req.OrderBy.TagName != "" {
+               // Sorted query: Use unified k-way merge with binary search 
insertion
+               properties = ps.sortedQueryWithDedup(nodeProperties, req)
+       } else {
+               // No ordering: Simple dedup without sorting
+               properties = ps.simpleDedupWithoutSort(nodeProperties)
+       }
+
+       result := make([]*propertyv1.Property, 0, len(properties))
+       for _, property := range properties {
+               if err := ps.repairPropertyIfNeed(property.entity, property, 
groups); err != nil {
                        ps.log.Warn().Msgf("failed to repair properties when 
query: %v", err)
                }
-
                // ignore deleted property in the query
-               if p.deletedTime > 0 {
+               if property.deletedTime > 0 {
                        continue
                }
                if len(req.TagProjection) > 0 {
                        var tags []*modelv1.Tag
-                       for _, tag := range p.Tags {
+                       for _, tag := range property.Tags {
                                for _, tp := range req.TagProjection {
                                        if tp == tag.Key {
                                                tags = append(tags, tag)
                                                break
                                        }
                                }
                        }
-                       p.Tags = tags
+                       property.Tags = tags
+               }
+               result = append(result, property.Property)
+       }
+
+       // Apply limit
+       if len(result) > int(req.Limit) {
+               result = result[:req.Limit]
+       }
+       return &propertyv1.QueryResponse{Properties: result, Trace: trace}, nil
+}
+
+// sortedQueryWithDedup implements unified sorted query handling for both 
single and multi-node scenarios.
+// It uses k-way merge with binary search insertion to maintain sorted order 
efficiently.
+func (ps *propertyServer) sortedQueryWithDedup(
+       nodeProperties map[string][]*propertyWithMetadata,
+       req *propertyv1.QueryRequest,
+) []*propertyWithCount {
+       // Initialize k-way merge iterators
+       iters := make([]sortpkg.Iterator[*propertyWithMetadata], 0, 
len(nodeProperties))
+       for _, props := range nodeProperties {
+               if len(props) > 0 {
+                       iters = append(iters, 
newPropertyWithMetadataIterator(props))
+               }
+       }
+
+       if len(iters) == 0 {
+               return nil
+       }
+
+       // Create heap-based merge iterator
+       isDesc := req.OrderBy.Sort == modelv1.Sort_SORT_DESC
+       mergeIter := sortpkg.NewItemIter(iters, isDesc)
+       defer mergeIter.Close()
+
+       // Initialize deduplication state
+       // seenIDs tracks entity -> propertyWithCount for deduplication
+       seenIDs := make(map[string]*propertyWithCount)
+       // resultBuffer maintains sorted list of unique properties
+       resultBuffer := make([]*propertyWithCount, 0, req.Limit)
+
+       // Process items from k-way merge
+       for mergeIter.Next() {
+               p := mergeIter.Val()
+               entity := propertypkg.GetEntity(p.Property)
+
+               // Check if we've seen this entity before
+               if existingCount, seen := seenIDs[entity]; seen {
+                       // Same modRevision - accumulate node
+                       if p.Metadata.ModRevision == 
existingCount.Metadata.ModRevision {
+                               existingCount.addExistNode(p.node)
+                               continue
+                       }
+
+                       // Older modRevision - skip
+                       if p.Metadata.ModRevision < 
existingCount.Metadata.ModRevision {
+                               continue
+                       }
+
+                       // Newer modRevision - replace old entry
+                       // Find and remove old entry from resultBuffer using 
binary search
+                       oldIndex := ps.findPropertyInBuffer(resultBuffer, 
existingCount, isDesc)
+                       if oldIndex >= 0 && oldIndex < len(resultBuffer) {
+                               // Remove old entry
+                               resultBuffer = append(resultBuffer[:oldIndex], 
resultBuffer[oldIndex+1:]...)
+                       }
+
+                       // Create new propertyWithCount for the newer revision
+                       newCount := newPropertyWithCounts(p, entity, p.node)
+                       seenIDs[entity] = newCount
+
+                       // Insert new entry in sorted position using binary 
search
+                       insertPos := ps.findInsertPosition(resultBuffer, 
newCount, isDesc)
+                       resultBuffer = ps.insertAtPosition(resultBuffer, 
newCount, insertPos)
+
+                       continue
+               }
+
+               // New entity - first time seeing this entity
+               // Create propertyWithCount and add to seenIDs
+               propCount := newPropertyWithCounts(p, entity, p.node)
+               seenIDs[entity] = propCount
+
+               // Insert into resultBuffer at correct sorted position
+               insertPos := ps.findInsertPosition(resultBuffer, propCount, 
isDesc)
+               resultBuffer = ps.insertAtPosition(resultBuffer, propCount, 
insertPos)

Review Comment:
   Merge them to a function. They can only be used once here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to