hanahmily commented on code in PR #861:
URL:
https://github.com/apache/skywalking-banyandb/pull/861#discussion_r2589450013
##########
pkg/index/inverted/query.go:
##########
@@ -677,6 +677,7 @@ func BuildPropertyQuery(req *propertyv1.QueryRequest,
groupField, idField string
bq.AddMust(iq)
bn.Append(in)
}
+
Review Comment:
```suggestion
```
##########
banyand/liaison/grpc/property.go:
##########
@@ -439,54 +442,237 @@ func (ps *propertyServer) Query(ctx context.Context, req
*propertyv1.QueryReques
if err != nil {
return nil, err
}
- res := make(map[string]*propertyWithCount)
- for n, nodeWithProperties := range nodeProperties {
- for _, propertyMetadata := range nodeWithProperties {
- entity :=
propertypkg.GetEntity(propertyMetadata.Property)
- cur, ok := res[entity]
- if !ok {
- res[entity] =
newPropertyWithCounts(propertyMetadata, n)
- continue
- }
- switch {
- case cur.Metadata.ModRevision <
propertyMetadata.Metadata.ModRevision: // newer revision
- res[entity] =
newPropertyWithCounts(propertyMetadata, n)
- case cur.Metadata.ModRevision ==
propertyMetadata.Metadata.ModRevision: // same revision
- cur.addExistNode(n)
- }
- }
- }
- if len(res) == 0 {
+ if len(nodeProperties) == 0 {
return &propertyv1.QueryResponse{Properties: nil, Trace:
trace}, nil
}
- properties := make([]*propertyv1.Property, 0, len(res))
- for entity, p := range res {
- if err := ps.repairPropertyIfNeed(entity, p, groups); err !=
nil {
+
+ var properties []*propertyWithCount
+
+ // Choose processing path based on whether ordering is requested
+ if req.OrderBy != nil && req.OrderBy.TagName != "" {
+ // Sorted query: Use unified k-way merge with binary search
insertion
+ properties = ps.sortedQueryWithDedup(nodeProperties, req)
+ } else {
+ // No ordering: Simple dedup without sorting
+ properties = ps.simpleDedupWithoutSort(nodeProperties)
+ }
+
+ result := make([]*propertyv1.Property, 0, len(properties))
+ for _, property := range properties {
+ if err := ps.repairPropertyIfNeed(property.entity, property,
groups); err != nil {
ps.log.Warn().Msgf("failed to repair properties when
query: %v", err)
}
-
// ignore deleted property in the query
- if p.deletedTime > 0 {
+ if property.deletedTime > 0 {
continue
}
if len(req.TagProjection) > 0 {
var tags []*modelv1.Tag
- for _, tag := range p.Tags {
+ for _, tag := range property.Tags {
for _, tp := range req.TagProjection {
if tp == tag.Key {
tags = append(tags, tag)
break
}
}
}
- p.Tags = tags
+ property.Tags = tags
+ }
+ result = append(result, property.Property)
+ }
+
+ // Apply limit
+ if len(result) > int(req.Limit) {
+ result = result[:req.Limit]
+ }
+ return &propertyv1.QueryResponse{Properties: result, Trace: trace}, nil
+}
+
+// sortedQueryWithDedup implements unified sorted query handling for both
single and multi-node scenarios.
+// It uses k-way merge with binary search insertion to maintain sorted order
efficiently.
+func (ps *propertyServer) sortedQueryWithDedup(
+ nodeProperties map[string][]*propertyWithMetadata,
+ req *propertyv1.QueryRequest,
+) []*propertyWithCount {
+ // Initialize k-way merge iterators
+ iters := make([]sortpkg.Iterator[*propertyWithMetadata], 0,
len(nodeProperties))
+ for _, props := range nodeProperties {
+ if len(props) > 0 {
+ iters = append(iters,
newPropertyWithMetadataIterator(props))
+ }
+ }
+
+ if len(iters) == 0 {
+ return nil
+ }
+
+ // Create heap-based merge iterator
+ isDesc := req.OrderBy.Sort == modelv1.Sort_SORT_DESC
+ mergeIter := sortpkg.NewItemIter(iters, isDesc)
+ defer mergeIter.Close()
+
+ // Initialize deduplication state
+ // seenIDs tracks entity -> propertyWithCount for deduplication
+ seenIDs := make(map[string]*propertyWithCount)
+ // resultBuffer maintains sorted list of unique properties
+ resultBuffer := make([]*propertyWithCount, 0, req.Limit)
+
+ // Process items from k-way merge
+ for mergeIter.Next() {
+ p := mergeIter.Val()
+ entity := propertypkg.GetEntity(p.Property)
+
+ // Check if we've seen this entity before
+ if existingCount, seen := seenIDs[entity]; seen {
+ // Same modRevision - accumulate node
+ if p.Metadata.ModRevision ==
existingCount.Metadata.ModRevision {
+ existingCount.addExistNode(p.node)
+ continue
+ }
+
+ // Older modRevision - skip
+ if p.Metadata.ModRevision <
existingCount.Metadata.ModRevision {
+ continue
+ }
+
+ // Newer modRevision - replace old entry
+ // Find and remove old entry from resultBuffer using
binary search
+ oldIndex := ps.findPropertyInBuffer(resultBuffer,
existingCount, isDesc)
+ if oldIndex >= 0 && oldIndex < len(resultBuffer) {
+ // Remove old entry
+ resultBuffer = append(resultBuffer[:oldIndex],
resultBuffer[oldIndex+1:]...)
+ }
+
+ // Create new propertyWithCount for the newer revision
+ newCount := newPropertyWithCounts(p, entity, p.node)
+ seenIDs[entity] = newCount
+
+ // Insert new entry in sorted position using binary
search
+ insertPos := ps.findInsertPosition(resultBuffer,
newCount, isDesc)
+ resultBuffer = ps.insertAtPosition(resultBuffer,
newCount, insertPos)
+
+ continue
+ }
+
+ // New entity - first time seeing this entity
+ // Create propertyWithCount and add to seenIDs
+ propCount := newPropertyWithCounts(p, entity, p.node)
+ seenIDs[entity] = propCount
+
+ // Insert into resultBuffer at correct sorted position
+ insertPos := ps.findInsertPosition(resultBuffer, propCount,
isDesc)
+ resultBuffer = ps.insertAtPosition(resultBuffer, propCount,
insertPos)
Review Comment:
Merge them to a function. They can only be used once here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]