hanahmily commented on code in PR #957:
URL: 
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2757704782


##########
pkg/query/aggregation/aggregation.go:
##########
@@ -114,3 +116,17 @@ func zero[N Number]() N {
        var z N
        return z
 }
+
+// IsDistributedMean checks if the function is a distributed mean function.
+func IsDistributedMean[N Number](f Func[N]) bool {
+       _, ok := f.(*distributedMeanFunc[N])
+       return ok
+}

Review Comment:
   ```suggestion
   ```



##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -189,17 +198,38 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
        if resultDp == nil {
                return nil
        }
-       val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
-       if err != nil {
-               ami.err = err
-               return nil
-       }
-       resultDp.Fields = []*measurev1.DataPoint_Field{
-               {
-                       Name:  ami.aggregationFieldRef.Field.Name,
-                       Value: val,
-               },
+       var fields []*measurev1.DataPoint_Field
+       sumVal, countVal, isDistributedMean := 
aggregation.GetSumCount(ami.aggrFunc)
+       if isDistributedMean {
+               sumFieldVal, sumErr := aggregation.ToFieldValue(sumVal)
+               if sumErr != nil {
+                       ami.err = sumErr
+                       return nil
+               }
+               countFieldVal, countErr := aggregation.ToFieldValue(countVal)
+               if countErr != nil {
+                       ami.err = countErr
+                       return nil
+               }
+               sumName, countName := 
distributedMeanFieldNames(ami.aggregationFieldRef.Field.Name)
+               fields = []*measurev1.DataPoint_Field{
+                       {Name: sumName, Value: sumFieldVal},
+                       {Name: countName, Value: countFieldVal},
+               }
+       } else {
+               val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+               if err != nil {
+                       ami.err = err
+                       return nil
+               }
+               fields = []*measurev1.DataPoint_Field{
+                       {
+                               Name:  ami.aggregationFieldRef.Field.Name,
+                               Value: val,
+                       },
+               }

Review Comment:
   Revert this section. 
   
   You should change aggGroupIterator/aggAllIterator.aggregationFieldRef into a 
list with multiple fields that can contain "value", "sum", and "count". 
   
   Modify unresolvedAggregation.Analyze to generate the aggregationFieldRefs 
with "sum" and "count" when aggFunc is the distributed mean. 
   
   Summary: Add the specific analysis step only for the distributed mean 
function. Do not include any code for this in aggGroupIterator and 
aggAllIterator.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -575,3 +586,107 @@ func hashWithShard(shardID, groupKey uint64) uint64 {
        h = (h ^ groupKey) * prime64
        return h
 }
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from 
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName 
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint, 
error) {

Review Comment:
   Delete this function. 
   
   The group operation should be performed in `measure_plan_groupby.go`, the 
aggregation operation belongs to `measure_plan_aggregation.go`



##########
pkg/query/aggregation/aggregation.go:
##########
@@ -114,3 +116,17 @@ func zero[N Number]() N {
        var z N
        return z
 }
+
+// IsDistributedMean checks if the function is a distributed mean function.
+func IsDistributedMean[N Number](f Func[N]) bool {
+       _, ok := f.(*distributedMeanFunc[N])
+       return ok
+}
+
+// GetSumCount returns sum and count if the function is a distributed mean 
function.
+func GetSumCount[N Number](f Func[N]) (sum N, count N, ok bool) {
+       if dmf, ok := f.(*distributedMeanFunc[N]); ok {
+               return dmf.sum, dmf.count, true
+       }
+       return zero[N](), zero[N](), false
+}

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to