hanahmily commented on code in PR #957:
URL:
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2757704782
##########
pkg/query/aggregation/aggregation.go:
##########
@@ -114,3 +116,17 @@ func zero[N Number]() N {
var z N
return z
}
+
+// IsDistributedMean checks if the function is a distributed mean function.
+func IsDistributedMean[N Number](f Func[N]) bool {
+ _, ok := f.(*distributedMeanFunc[N])
+ return ok
+}
Review Comment:
```suggestion
```
##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -189,17 +198,38 @@ func (ami *aggGroupIterator[N]) Current()
[]*measurev1.InternalDataPoint {
if resultDp == nil {
return nil
}
- val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
- if err != nil {
- ami.err = err
- return nil
- }
- resultDp.Fields = []*measurev1.DataPoint_Field{
- {
- Name: ami.aggregationFieldRef.Field.Name,
- Value: val,
- },
+ var fields []*measurev1.DataPoint_Field
+ sumVal, countVal, isDistributedMean :=
aggregation.GetSumCount(ami.aggrFunc)
+ if isDistributedMean {
+ sumFieldVal, sumErr := aggregation.ToFieldValue(sumVal)
+ if sumErr != nil {
+ ami.err = sumErr
+ return nil
+ }
+ countFieldVal, countErr := aggregation.ToFieldValue(countVal)
+ if countErr != nil {
+ ami.err = countErr
+ return nil
+ }
+ sumName, countName :=
distributedMeanFieldNames(ami.aggregationFieldRef.Field.Name)
+ fields = []*measurev1.DataPoint_Field{
+ {Name: sumName, Value: sumFieldVal},
+ {Name: countName, Value: countFieldVal},
+ }
+ } else {
+ val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
+ if err != nil {
+ ami.err = err
+ return nil
+ }
+ fields = []*measurev1.DataPoint_Field{
+ {
+ Name: ami.aggregationFieldRef.Field.Name,
+ Value: val,
+ },
+ }
Review Comment:
Revert this section.
You should change aggGroupIterator/aggAllIterator.aggregationFieldRef into a
list with multiple fields that can contain "value", "sum", and "count".
Modify unresolvedAggregation.Analyze to generate the aggregationFieldRefs
with "sum" and "count" when aggFunc is the distributed mean.
Summary: Add the specific analysis step only for the distributed mean
function. Do not include any code for this in aggGroupIterator and
aggAllIterator.
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -575,3 +586,107 @@ func hashWithShard(shardID, groupKey uint64) uint64 {
h = (h ^ groupKey) * prime64
return h
}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint,
error) {
Review Comment:
Delete this function.
The group operation should be performed in `measure_plan_groupby.go`, the
aggregation operation belongs to `measure_plan_aggregation.go`
##########
pkg/query/aggregation/aggregation.go:
##########
@@ -114,3 +116,17 @@ func zero[N Number]() N {
var z N
return z
}
+
+// IsDistributedMean checks if the function is a distributed mean function.
+func IsDistributedMean[N Number](f Func[N]) bool {
+ _, ok := f.(*distributedMeanFunc[N])
+ return ok
+}
+
+// GetSumCount returns sum and count if the function is a distributed mean
function.
+func GetSumCount[N Number](f Func[N]) (sum N, count N, ok bool) {
+ if dmf, ok := f.(*distributedMeanFunc[N]); ok {
+ return dmf.sum, dmf.count, true
+ }
+ return zero[N](), zero[N](), false
+}
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]