hanahmily commented on code in PR #957:
URL: 
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2750587188


##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,18 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
        }
 
        if criteria.GetAgg() != nil {
+               // Check if this is a distributed mean aggregation that needs 
to return sum and count
+               // This happens when the query is pushed down from liaison node 
to data node
+               isDistributedMean := false
+               if isDistributed && criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+                       criteria.GetTop() == nil {

Review Comment:
   The top operation should work well with the distributed aggregation. 



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -104,9 +127,39 @@ func newAggregationPlan[N aggregation.Number](gba 
*unresolvedAggregation, prevPl
                aggrFunc:            aggrFunc,
                aggregationFieldRef: fieldRef,
                isGroup:             gba.isGroup,
+               isDistributedMean:   gba.isDistributedMean,
        }, nil
 }
 
+type distributedMeanFunc[N aggregation.Number] struct {

Review Comment:
   move the implementation to the aggregation pkg. 



##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,18 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
        }
 
        if criteria.GetAgg() != nil {
+               // Check if this is a distributed mean aggregation that needs 
to return sum and count
+               // This happens when the query is pushed down from liaison node 
to data node
+               isDistributedMean := false
+               if isDistributed && criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+                       criteria.GetTop() == nil {
+                       isDistributedMean = true
+               }
                plan = newUnresolvedAggregation(plan,
                        logical.NewField(criteria.GetAgg().GetFieldName()),
                        criteria.GetAgg().GetFunction(),
                        criteria.GetGroupBy() != nil,
+                       isDistributedMean,

Review Comment:
   ```suggestion
   ```
   
   Pass the distributed mean function through the 3rd parameter instead of 
adding an extra flag. 



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -86,14 +94,29 @@ type aggregationPlan[N aggregation.Number] struct {
        aggrFunc            aggregation.Func[N]
        aggrType            modelv1.AggregationFunction
        isGroup             bool
+       isDistributedMean   bool
+}
+
+// DistributedMeanFunc is used for distributed mean aggregation, returning sum 
and count.
+type DistributedMeanFunc[N aggregation.Number] interface {

Review Comment:
   Change the interface `Func` in `aggregation.go` instead of creating a new 
one. 



##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -175,22 +183,23 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
        }
        pushedLimit := int(limitParameter + criteria.GetOffset())
 
-       if criteria.GetGroupBy() != nil {
-               plan = newUnresolvedGroupBy(plan, groupByTags, false)
-               pushedLimit = math.MaxInt
-       }
+       // When needCompletePushDownAgg is true, aggregation is already done on 
data nodes,
+       // so we should not create groupBy and aggregation plans on liaison node

Review Comment:
   This assumption is not correct. Aggregation should be performed in the 
liaison and the data nodes. We should create group-by and aggregation plans for 
the liaison node. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to