hanahmily commented on code in PR #957:
URL: 
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2725987062


##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -162,7 +177,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
                (criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
                        criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
                        criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT) &&
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT ||
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN) &&

Review Comment:
   Remove all function comparisons, since you implemented all 
distribution-function-pushing-down logic. 



##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,19 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
        }
 
        if criteria.GetAgg() != nil {
+               // Check if this is a distributed mean aggregation that needs 
to return sum and count
+               // This happens when the query is pushed down from liaison node 
to data node
+               isDistributedMean := false
+               if isDistributed && criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+                       criteria.GetGroupBy() != nil &&

Review Comment:
   This suggestion makes sense. You should fix it.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDa
        return result, nil
 }
 
+// mergeNonGroupByAggregation merges aggregation results from multiple shards 
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg 
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {

Review Comment:
   Please use measure_plan_aggregation.go and aggregation.Func to implement the 
aggregation. Do not break the query plan. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to