hanahmily commented on code in PR #932:
URL: 
https://github.com/apache/skywalking-banyandb/pull/932#discussion_r2680717655


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -535,9 +535,7 @@ func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
 
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
-// by keeping only one data point per group. Since replicas hold identical 
data, aggregates
-// for the same group are identical across replicas.
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas.
 func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {

Review Comment:
   The function `deduplicateAggregatedDataPoints` has already deduplicated the 
data, but the deduplication logic is rough. 
   
   It only deduplicates based on the group-by key. While it successfully 
removes duplicates across replicas of the same shard, it needs to retain the 
same group key when it comes from different shards.
   
   



##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -180,12 +181,18 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
        }
 
        if criteria.GetAgg() != nil {
-               plan = newUnresolvedAggregation(plan,
-                       logical.NewField(criteria.GetAgg().GetFieldName()),
-                       criteria.GetAgg().GetFunction(),
-                       criteria.GetGroupBy() != nil,
-               )
-               pushedLimit = math.MaxInt
+               // If needCompletePushDownAgg is true and has GROUP BY, skip 
aggregation plan

Review Comment:
   According to my comment on deduplicateAggregatedDataPoints, the current 
logic should remain unchanged.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -535,9 +535,7 @@ func (s *pushedDownAggregatedIterator) Close() error {
        return nil
 }
 
-// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
-// by keeping only one data point per group. Since replicas hold identical 
data, aggregates
-// for the same group are identical across replicas.
+// deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas.
 func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {

Review Comment:
   You would create some new protobuf messages and rpc
   
   ```proto
   message InternalDataPoint {
     DataPoint data_point = 1;
     uint64 shard_id = 2;
   }
   
   message InternalQueryResponse {
     repeated InternalDataPoint data_points = 1;
     common.v1.Trace trace = 2;
   }
   
   message InternalQueryRequest {
      QueryRequest query_request = 1;
   }
   
   
   rpc InternalQuery(InternalQueryResponse) returns (InternalQueryResponse) 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to