hanahmily commented on code in PR #943:
URL: 
https://github.com/apache/skywalking-banyandb/pull/943#discussion_r2697184424


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -531,26 +542,54 @@ func (s *pushedDownAggregatedIterator) Current() 
[]*measurev1.DataPoint {
        return []*measurev1.DataPoint{s.dataPoints[s.index-1]}
 }
 
+func (s *pushedDownAggregatedIterator) CurrentShardID() common.ShardID {
+       // In distributed query with aggregation, data comes from multiple 
shards, so shard ID is not applicable
+       return 0
+}
+
 func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
 
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas.
-func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+// deduplicateAggregatedDataPointsWithShard removes duplicate aggregated 
results from multiple replicas
+// of the same shard, while preserving results from different shards.
+func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDataPoint, groupByTagsRefs [][]*logical.TagRef) 
([]*measurev1.DataPoint, error) {
        if len(groupByTagsRefs) == 0 {
-               return dataPoints, nil
+               return extractDataPoints(dataPoints), nil
        }
+       // key = hash(shard_id, group_key)
+       // Same shard with same group key will be deduplicated
+       // Different shards with same group key will be preserved
        groupMap := make(map[uint64]struct{})
        result := make([]*measurev1.DataPoint, 0, len(dataPoints))
-       for _, dp := range dataPoints {
-               key, err := formatGroupByKey(dp, groupByTagsRefs)
-               if err != nil {
-                       return nil, err
+       for _, idp := range dataPoints {
+               groupKey, keyErr := formatGroupByKey(idp.DataPoint, 
groupByTagsRefs)
+               if keyErr != nil {
+                       return nil, keyErr
                }
+               // Include shard_id in key calculation
+               key := hashWithShard(uint64(idp.ShardId), groupKey)
                if _, exists := groupMap[key]; !exists {
                        groupMap[key] = struct{}{}
-                       result = append(result, dp)
+                       result = append(result, idp.DataPoint)
                }
        }
        return result, nil
 }

Review Comment:
   Please add test case to cover this function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to