Copilot commented on code in PR #957:
URL: 
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2720825748


##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,19 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
        }
 
        if criteria.GetAgg() != nil {
+               // Check if this is a distributed mean aggregation that needs 
to return sum and count
+               // This happens when the query is pushed down from liaison node 
to data node
+               isDistributedMean := false
+               if isDistributed && criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+                       criteria.GetGroupBy() != nil &&

Review Comment:
   `isDistributedMean` is only enabled when `criteria.GetGroupBy() != nil`, but 
`DistributedAnalyze` can push down MEAN even when there is no groupBy. In that 
case the data node will emit a normal MEAN (fieldName), while the liaison merge 
path expects `_sum`/`_count` and will drop results. Consider enabling 
distributed-mean mode for non-groupBy MEAN as well (as long as `Top == nil`), 
or adjust the pushdown conditions accordingly.
   ```suggestion
   
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDa
        return result, nil
 }
 
+// mergeNonGroupByAggregation merges aggregation results from multiple shards 
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg 
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+       if len(dataPoints) == 0 {
+               return nil
+       }
+       if len(dataPoints) == 1 {
+               return dataPoints
+       }
+       // Deduplicate by shard_id first (keep the one with highest version)
+       shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+       for _, idp := range dataPoints {
+               existing, exists := shardMap[idp.ShardId]
+               if !exists || idp.GetDataPoint().Version > 
existing.GetDataPoint().Version {
+                       shardMap[idp.ShardId] = idp
+               }
+       }
+       // Now merge results from different shards
+       deduplicatedDps := make([]*measurev1.InternalDataPoint, 0, 
len(shardMap))
+       for _, idp := range shardMap {
+               deduplicatedDps = append(deduplicatedDps, idp)
+       }
+       if len(deduplicatedDps) == 1 {
+               return deduplicatedDps
+       }
+       // Merge aggregation results based on function type
+       fieldName := agg.FieldName
+       var result *measurev1.InternalDataPoint
+       switch agg.Function {
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareLess(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareGreater(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+               // Sum all values
+               result = deduplicatedDps[0]
+               resultDp := result.GetDataPoint()
+               var sumInt int64
+               var sumFloat float64
+               isFloat := false
+               for _, idp := range deduplicatedDps {
+                       dp := idp.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       if fieldVal == nil {
+                               continue
+                       }
+                       switch v := fieldVal.Value.(type) {
+                       case *modelv1.FieldValue_Int:
+                               if !isFloat {
+                                       sumInt += v.Int.Value
+                               } else {
+                                       sumFloat += float64(v.Int.Value)
+                               }
+                       case *modelv1.FieldValue_Float:
+                               if !isFloat {
+                                       isFloat = true
+                                       sumFloat = float64(sumInt) + 
v.Float.Value
+                               } else {
+                                       sumFloat += v.Float.Value
+                               }
+                       }
+               }
+               // Update the result field value
+               for i, field := range resultDp.Fields {
+                       if field.Name == fieldName {
+                               if isFloat {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: sumFloat}}}
+                               } else {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 
sumInt}}}
+                               }
+                               break
+                       }
+               }
+       default:
+               // For other functions, just return the first one (should not 
happen for needCompletePushDownAgg)
+               return deduplicatedDps[:1]
+       }
+       return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string) 
*modelv1.FieldValue {
+       for _, field := range dp.Fields {
+               if field.Name == fieldName {
+                       return field.Value
+               }
+       }
+       return nil
+}
+
+// compareLess compares two field values and returns true if the first is less 
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value < val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value < val2.Float.Value
+               }
+       }
+       return false
+}
+
+// compareGreater compares two field values and returns true if the first is 
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value > val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value > val2.Float.Value
+               }
+       }
+       return false
+}
+
 // hashWithShard combines shard_id and group_key into a single hash.
 func hashWithShard(shardID, groupKey uint64) uint64 {
        h := uint64(offset64)
        h = (h ^ shardID) * prime64
        h = (h ^ groupKey) * prime64
        return h
 }
+
+type meanGroup struct {
+       dataPoint  *measurev1.DataPoint
+       shardID    uint32
+       sumInt     int64
+       countInt   int64
+       sumFloat   float64
+       countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from 
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName 
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint, 
error) {
+       groupMap := make(map[uint64]*meanGroup)
+       for _, idp := range dataPoints {
+               dp := idp.GetDataPoint()
+               var groupKey uint64
+               if len(groupByTagsRefs) > 0 {
+                       var keyErr error
+                       groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+                       if keyErr != nil {
+                               return nil, keyErr
+                       }
+               }
+               var sumVal int64
+               var countVal int64
+               var sumFloat float64
+               var countFloat float64
+               var isFloat bool
+               var sumFound, countFound bool
+               for _, field := range dp.Fields {
+                       if field.Name == fieldName+"_sum" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       sumVal = intVal.Value
+                                       isFloat = false
+                                       sumFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       sumFloat = floatVal.Value
+                                       isFloat = true
+                                       sumFound = true
+                               }
+                       } else if field.Name == fieldName+"_count" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       countVal = intVal.Value
+                                       countFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       countFloat = floatVal.Value
+                                       countFound = true
+                               }
+                       }
+               }
+               if !sumFound || !countFound {
+                       continue
+               }
+               if _, exists := groupMap[groupKey]; !exists {
+                       groupMap[groupKey] = &meanGroup{
+                               dataPoint: &measurev1.DataPoint{
+                                       TagFamilies: dp.TagFamilies,
+                               },
+                               shardID: idp.ShardId,
+                       }
+               }
+               mg := groupMap[groupKey]
+               if isFloat {
+                       mg.sumFloat += sumFloat
+                       mg.countFloat += countFloat
+               } else {
+                       mg.sumInt += sumVal
+                       mg.countInt += countVal
+               }
+       }
+       result := make([]*measurev1.InternalDataPoint, 0, len(groupMap))
+       for _, mg := range groupMap {
+               var meanVal *modelv1.FieldValue
+               switch {
+               case mg.countFloat > 0:
+                       mean := mg.sumFloat / mg.countFloat
+                       meanVal = &modelv1.FieldValue{
+                               Value: &modelv1.FieldValue_Float{
+                                       Float: &modelv1.Float{Value: mean},
+                               },
+                       }

Review Comment:
   `mergeMeanAggregation` computes the final mean as `sum/count` but does not 
apply the same semantics as the existing `aggregation.meanFunc.Val()` (which 
clamps results < 1 to 1 when count > 0). This makes distributed MEAN 
potentially return 0 or <1 values that a non-distributed MEAN would never 
return. Apply the same clamping logic here to keep results consistent across 
execution modes.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDa
        return result, nil
 }
 
+// mergeNonGroupByAggregation merges aggregation results from multiple shards 
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg 
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+       if len(dataPoints) == 0 {
+               return nil
+       }
+       if len(dataPoints) == 1 {
+               return dataPoints
+       }
+       // Deduplicate by shard_id first (keep the one with highest version)
+       shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+       for _, idp := range dataPoints {
+               existing, exists := shardMap[idp.ShardId]
+               if !exists || idp.GetDataPoint().Version > 
existing.GetDataPoint().Version {
+                       shardMap[idp.ShardId] = idp
+               }
+       }
+       // Now merge results from different shards
+       deduplicatedDps := make([]*measurev1.InternalDataPoint, 0, 
len(shardMap))
+       for _, idp := range shardMap {
+               deduplicatedDps = append(deduplicatedDps, idp)
+       }
+       if len(deduplicatedDps) == 1 {
+               return deduplicatedDps
+       }
+       // Merge aggregation results based on function type
+       fieldName := agg.FieldName
+       var result *measurev1.InternalDataPoint
+       switch agg.Function {
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareLess(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareGreater(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+               // Sum all values
+               result = deduplicatedDps[0]
+               resultDp := result.GetDataPoint()
+               var sumInt int64
+               var sumFloat float64
+               isFloat := false
+               for _, idp := range deduplicatedDps {
+                       dp := idp.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       if fieldVal == nil {
+                               continue
+                       }
+                       switch v := fieldVal.Value.(type) {
+                       case *modelv1.FieldValue_Int:
+                               if !isFloat {
+                                       sumInt += v.Int.Value
+                               } else {
+                                       sumFloat += float64(v.Int.Value)
+                               }
+                       case *modelv1.FieldValue_Float:
+                               if !isFloat {
+                                       isFloat = true
+                                       sumFloat = float64(sumInt) + 
v.Float.Value
+                               } else {
+                                       sumFloat += v.Float.Value
+                               }
+                       }
+               }
+               // Update the result field value
+               for i, field := range resultDp.Fields {
+                       if field.Name == fieldName {
+                               if isFloat {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: sumFloat}}}
+                               } else {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 
sumInt}}}
+                               }
+                               break
+                       }
+               }
+       default:
+               // For other functions, just return the first one (should not 
happen for needCompletePushDownAgg)
+               return deduplicatedDps[:1]
+       }
+       return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string) 
*modelv1.FieldValue {
+       for _, field := range dp.Fields {
+               if field.Name == fieldName {
+                       return field.Value
+               }
+       }
+       return nil
+}
+
+// compareLess compares two field values and returns true if the first is less 
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value < val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value < val2.Float.Value
+               }
+       }
+       return false
+}
+
+// compareGreater compares two field values and returns true if the first is 
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value > val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value > val2.Float.Value
+               }
+       }
+       return false
+}
+
 // hashWithShard combines shard_id and group_key into a single hash.
 func hashWithShard(shardID, groupKey uint64) uint64 {
        h := uint64(offset64)
        h = (h ^ shardID) * prime64
        h = (h ^ groupKey) * prime64
        return h
 }
+
+type meanGroup struct {
+       dataPoint  *measurev1.DataPoint
+       shardID    uint32
+       sumInt     int64
+       countInt   int64
+       sumFloat   float64
+       countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from 
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName 
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint, 
error) {
+       groupMap := make(map[uint64]*meanGroup)
+       for _, idp := range dataPoints {
+               dp := idp.GetDataPoint()
+               var groupKey uint64
+               if len(groupByTagsRefs) > 0 {
+                       var keyErr error
+                       groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+                       if keyErr != nil {
+                               return nil, keyErr
+                       }
+               }
+               var sumVal int64
+               var countVal int64
+               var sumFloat float64
+               var countFloat float64
+               var isFloat bool
+               var sumFound, countFound bool
+               for _, field := range dp.Fields {
+                       if field.Name == fieldName+"_sum" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       sumVal = intVal.Value
+                                       isFloat = false
+                                       sumFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       sumFloat = floatVal.Value
+                                       isFloat = true
+                                       sumFound = true
+                               }
+                       } else if field.Name == fieldName+"_count" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       countVal = intVal.Value
+                                       countFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       countFloat = floatVal.Value
+                                       countFound = true
+                               }
+                       }
+               }
+               if !sumFound || !countFound {
+                       continue

Review Comment:
   `mergeMeanAggregation` silently `continue`s when a datapoint is missing 
either the `_sum` or `_count` field, which can turn a partially valid response 
into an empty/partial result without surfacing an error. Since 
`mergeMeanAggregation` is only used for MEAN in the pushdown path, it should 
either return an error on missing fields or explicitly handle a fallback 
representation (e.g., already-merged mean).
   ```suggestion
                        return nil, fmt.Errorf("mergeMeanAggregation: datapoint 
missing %s_sum or %s_count", fieldName, fieldName)
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDa
        return result, nil
 }
 
+// mergeNonGroupByAggregation merges aggregation results from multiple shards 
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg 
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+       if len(dataPoints) == 0 {
+               return nil
+       }
+       if len(dataPoints) == 1 {
+               return dataPoints
+       }
+       // Deduplicate by shard_id first (keep the one with highest version)
+       shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+       for _, idp := range dataPoints {
+               existing, exists := shardMap[idp.ShardId]
+               if !exists || idp.GetDataPoint().Version > 
existing.GetDataPoint().Version {
+                       shardMap[idp.ShardId] = idp
+               }
+       }
+       // Now merge results from different shards
+       deduplicatedDps := make([]*measurev1.InternalDataPoint, 0, 
len(shardMap))
+       for _, idp := range shardMap {
+               deduplicatedDps = append(deduplicatedDps, idp)
+       }
+       if len(deduplicatedDps) == 1 {
+               return deduplicatedDps
+       }
+       // Merge aggregation results based on function type
+       fieldName := agg.FieldName
+       var result *measurev1.InternalDataPoint
+       switch agg.Function {
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareLess(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+               result = deduplicatedDps[0]
+               for i := 1; i < len(deduplicatedDps); i++ {
+                       dp := deduplicatedDps[i].GetDataPoint()
+                       resultDp := result.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       resultFieldVal := getFieldValue(resultDp, fieldName)
+                       if fieldVal != nil && resultFieldVal != nil {
+                               if compareGreater(fieldVal, resultFieldVal) {
+                                       result = deduplicatedDps[i]
+                               }
+                       }
+               }
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+               // Sum all values
+               result = deduplicatedDps[0]
+               resultDp := result.GetDataPoint()
+               var sumInt int64
+               var sumFloat float64
+               isFloat := false
+               for _, idp := range deduplicatedDps {
+                       dp := idp.GetDataPoint()
+                       fieldVal := getFieldValue(dp, fieldName)
+                       if fieldVal == nil {
+                               continue
+                       }
+                       switch v := fieldVal.Value.(type) {
+                       case *modelv1.FieldValue_Int:
+                               if !isFloat {
+                                       sumInt += v.Int.Value
+                               } else {
+                                       sumFloat += float64(v.Int.Value)
+                               }
+                       case *modelv1.FieldValue_Float:
+                               if !isFloat {
+                                       isFloat = true
+                                       sumFloat = float64(sumInt) + 
v.Float.Value
+                               } else {
+                                       sumFloat += v.Float.Value
+                               }
+                       }
+               }
+               // Update the result field value
+               for i, field := range resultDp.Fields {
+                       if field.Name == fieldName {
+                               if isFloat {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: sumFloat}}}
+                               } else {
+                                       resultDp.Fields[i].Value = 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 
sumInt}}}
+                               }
+                               break
+                       }
+               }
+       default:
+               // For other functions, just return the first one (should not 
happen for needCompletePushDownAgg)
+               return deduplicatedDps[:1]
+       }
+       return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string) 
*modelv1.FieldValue {
+       for _, field := range dp.Fields {
+               if field.Name == fieldName {
+                       return field.Value
+               }
+       }
+       return nil
+}
+
+// compareLess compares two field values and returns true if the first is less 
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value < val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value < val2.Float.Value
+               }
+       }
+       return false
+}
+
+// compareGreater compares two field values and returns true if the first is 
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+       switch val1 := v1.Value.(type) {
+       case *modelv1.FieldValue_Int:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+                       return val1.Int.Value > val2.Int.Value
+               }
+       case *modelv1.FieldValue_Float:
+               if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+                       return val1.Float.Value > val2.Float.Value
+               }
+       }
+       return false
+}
+
 // hashWithShard combines shard_id and group_key into a single hash.
 func hashWithShard(shardID, groupKey uint64) uint64 {
        h := uint64(offset64)
        h = (h ^ shardID) * prime64
        h = (h ^ groupKey) * prime64
        return h
 }
+
+type meanGroup struct {
+       dataPoint  *measurev1.DataPoint
+       shardID    uint32
+       sumInt     int64
+       countInt   int64
+       sumFloat   float64
+       countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from 
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName 
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint, 
error) {
+       groupMap := make(map[uint64]*meanGroup)
+       for _, idp := range dataPoints {
+               dp := idp.GetDataPoint()
+               var groupKey uint64
+               if len(groupByTagsRefs) > 0 {
+                       var keyErr error
+                       groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+                       if keyErr != nil {
+                               return nil, keyErr
+                       }
+               }
+               var sumVal int64
+               var countVal int64
+               var sumFloat float64
+               var countFloat float64
+               var isFloat bool
+               var sumFound, countFound bool
+               for _, field := range dp.Fields {
+                       if field.Name == fieldName+"_sum" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       sumVal = intVal.Value
+                                       isFloat = false
+                                       sumFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       sumFloat = floatVal.Value
+                                       isFloat = true
+                                       sumFound = true
+                               }
+                       } else if field.Name == fieldName+"_count" {
+                               if intVal := field.Value.GetInt(); intVal != 
nil {
+                                       countVal = intVal.Value
+                                       countFound = true
+                               } else if floatVal := field.Value.GetFloat(); 
floatVal != nil {
+                                       countFloat = floatVal.Value
+                                       countFound = true
+                               }
+                       }
+               }
+               if !sumFound || !countFound {
+                       continue
+               }
+               if _, exists := groupMap[groupKey]; !exists {
+                       groupMap[groupKey] = &meanGroup{
+                               dataPoint: &measurev1.DataPoint{
+                                       TagFamilies: dp.TagFamilies,
+                               },
+                               shardID: idp.ShardId,
+                       }
+               }
+               mg := groupMap[groupKey]
+               if isFloat {
+                       mg.sumFloat += sumFloat
+                       mg.countFloat += countFloat
+               } else {
+                       mg.sumInt += sumVal
+                       mg.countInt += countVal
+               }
+       }
+       result := make([]*measurev1.InternalDataPoint, 0, len(groupMap))
+       for _, mg := range groupMap {
+               var meanVal *modelv1.FieldValue
+               switch {

Review Comment:
   `mergeMeanAggregation` builds `result` by ranging over a Go map (`for _, mg 
:= range groupMap`), which produces nondeterministic output ordering. The 
integration tests compare response ordering (unless `DisOrder` is set), so this 
can cause flaky failures and unstable API results. Preserve a deterministic 
order (e.g., track first-seen group keys from the input slice and emit results 
in that order, or sort by group tags).



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -311,10 +311,27 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(mi executor.MIterator, e
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
        if t.needCompletePushDownAgg {
+               // deduplicate: remove duplicate results from multiple replicas 
of the same shard
                deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
                if dedupErr != nil {
                        return nil, multierr.Append(err, dedupErr)
                }

Review Comment:
   The new deduplication step keeps the first datapoint per (shard_id, 
groupKey) and ignores `DataPoint.Version`. This can select an older replica 
result nondeterministically. Align with `sortedMIterator.loadDps` behavior by 
preferring the highest `Version` when duplicates are seen.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -311,10 +311,27 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(mi executor.MIterator, e
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
        if t.needCompletePushDownAgg {
+               // deduplicate: remove duplicate results from multiple replicas 
of the same shard
                deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
                if dedupErr != nil {
                        return nil, multierr.Append(err, dedupErr)
                }
+               // merge: for MEAN, merge sum and count from different shards 
with same groupKey
+               if t.queryTemplate.Agg != nil &&
+                       t.queryTemplate.Agg.GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN {
+                       mergedDps, mergeErr := 
mergeMeanAggregation(deduplicatedDps,
+                               t.queryTemplate.Agg.FieldName, 
t.groupByTagsRefs)
+                       if mergeErr != nil {
+                               return nil, multierr.Append(err, mergeErr)
+                       }
+                       return &pushedDownAggregatedIterator{dataPoints: 
mergedDps}, err
+               }

Review Comment:
   In the `needCompletePushDownAgg` path, MEAN always goes through 
`mergeMeanAggregation`, which expects per-shard `field_sum`/`field_count` 
fields. But data nodes only emit those fields when `isDistributedMean` is true 
(currently gated on `criteria.GetGroupBy() != nil`), so a MEAN query without 
groupBy will return datapoints with the normal `fieldName` only and will be 
dropped by `mergeMeanAggregation` (empty result). Consider treating non-groupBy 
MEAN as distributed mean too (emit sum/count), or adding a fallback merge path 
for MEAN without sum/count (and still deduplicate replicas by shard_id).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to