Copilot commented on code in PR #957:
URL:
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2720825748
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,19 @@ func Analyze(criteria *measurev1.QueryRequest, metadata
[]*commonv1.Metadata, ss
}
if criteria.GetAgg() != nil {
+ // Check if this is a distributed mean aggregation that needs
to return sum and count
+ // This happens when the query is pushed down from liaison node
to data node
+ isDistributedMean := false
+ if isDistributed && criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+ criteria.GetGroupBy() != nil &&
Review Comment:
`isDistributedMean` is only enabled when `criteria.GetGroupBy() != nil`, but
`DistributedAnalyze` can push down MEAN even when there is no groupBy. In that
case the data node will emit a normal MEAN (fieldName), while the liaison merge
path expects `_sum`/`_count` and will drop results. Consider enabling
distributed-mean mode for non-groupBy MEAN as well (as long as `Top == nil`),
or adjust the pushdown conditions accordingly.
```suggestion
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDa
return result, nil
}
+// mergeNonGroupByAggregation merges aggregation results from multiple shards
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+ if len(dataPoints) == 0 {
+ return nil
+ }
+ if len(dataPoints) == 1 {
+ return dataPoints
+ }
+ // Deduplicate by shard_id first (keep the one with highest version)
+ shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+ for _, idp := range dataPoints {
+ existing, exists := shardMap[idp.ShardId]
+ if !exists || idp.GetDataPoint().Version >
existing.GetDataPoint().Version {
+ shardMap[idp.ShardId] = idp
+ }
+ }
+ // Now merge results from different shards
+ deduplicatedDps := make([]*measurev1.InternalDataPoint, 0,
len(shardMap))
+ for _, idp := range shardMap {
+ deduplicatedDps = append(deduplicatedDps, idp)
+ }
+ if len(deduplicatedDps) == 1 {
+ return deduplicatedDps
+ }
+ // Merge aggregation results based on function type
+ fieldName := agg.FieldName
+ var result *measurev1.InternalDataPoint
+ switch agg.Function {
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareLess(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareGreater(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+ // Sum all values
+ result = deduplicatedDps[0]
+ resultDp := result.GetDataPoint()
+ var sumInt int64
+ var sumFloat float64
+ isFloat := false
+ for _, idp := range deduplicatedDps {
+ dp := idp.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ if fieldVal == nil {
+ continue
+ }
+ switch v := fieldVal.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if !isFloat {
+ sumInt += v.Int.Value
+ } else {
+ sumFloat += float64(v.Int.Value)
+ }
+ case *modelv1.FieldValue_Float:
+ if !isFloat {
+ isFloat = true
+ sumFloat = float64(sumInt) +
v.Float.Value
+ } else {
+ sumFloat += v.Float.Value
+ }
+ }
+ }
+ // Update the result field value
+ for i, field := range resultDp.Fields {
+ if field.Name == fieldName {
+ if isFloat {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float:
&modelv1.Float{Value: sumFloat}}}
+ } else {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value:
sumInt}}}
+ }
+ break
+ }
+ }
+ default:
+ // For other functions, just return the first one (should not
happen for needCompletePushDownAgg)
+ return deduplicatedDps[:1]
+ }
+ return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string)
*modelv1.FieldValue {
+ for _, field := range dp.Fields {
+ if field.Name == fieldName {
+ return field.Value
+ }
+ }
+ return nil
+}
+
+// compareLess compares two field values and returns true if the first is less
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value < val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value < val2.Float.Value
+ }
+ }
+ return false
+}
+
+// compareGreater compares two field values and returns true if the first is
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value > val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value > val2.Float.Value
+ }
+ }
+ return false
+}
+
// hashWithShard combines shard_id and group_key into a single hash.
func hashWithShard(shardID, groupKey uint64) uint64 {
h := uint64(offset64)
h = (h ^ shardID) * prime64
h = (h ^ groupKey) * prime64
return h
}
+
+type meanGroup struct {
+ dataPoint *measurev1.DataPoint
+ shardID uint32
+ sumInt int64
+ countInt int64
+ sumFloat float64
+ countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint,
error) {
+ groupMap := make(map[uint64]*meanGroup)
+ for _, idp := range dataPoints {
+ dp := idp.GetDataPoint()
+ var groupKey uint64
+ if len(groupByTagsRefs) > 0 {
+ var keyErr error
+ groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+ if keyErr != nil {
+ return nil, keyErr
+ }
+ }
+ var sumVal int64
+ var countVal int64
+ var sumFloat float64
+ var countFloat float64
+ var isFloat bool
+ var sumFound, countFound bool
+ for _, field := range dp.Fields {
+ if field.Name == fieldName+"_sum" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ sumVal = intVal.Value
+ isFloat = false
+ sumFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ sumFloat = floatVal.Value
+ isFloat = true
+ sumFound = true
+ }
+ } else if field.Name == fieldName+"_count" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ countVal = intVal.Value
+ countFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ countFloat = floatVal.Value
+ countFound = true
+ }
+ }
+ }
+ if !sumFound || !countFound {
+ continue
+ }
+ if _, exists := groupMap[groupKey]; !exists {
+ groupMap[groupKey] = &meanGroup{
+ dataPoint: &measurev1.DataPoint{
+ TagFamilies: dp.TagFamilies,
+ },
+ shardID: idp.ShardId,
+ }
+ }
+ mg := groupMap[groupKey]
+ if isFloat {
+ mg.sumFloat += sumFloat
+ mg.countFloat += countFloat
+ } else {
+ mg.sumInt += sumVal
+ mg.countInt += countVal
+ }
+ }
+ result := make([]*measurev1.InternalDataPoint, 0, len(groupMap))
+ for _, mg := range groupMap {
+ var meanVal *modelv1.FieldValue
+ switch {
+ case mg.countFloat > 0:
+ mean := mg.sumFloat / mg.countFloat
+ meanVal = &modelv1.FieldValue{
+ Value: &modelv1.FieldValue_Float{
+ Float: &modelv1.Float{Value: mean},
+ },
+ }
Review Comment:
`mergeMeanAggregation` computes the final mean as `sum/count` but does not
apply the same semantics as the existing `aggregation.meanFunc.Val()` (which
clamps results < 1 to 1 when count > 0). This makes distributed MEAN
potentially return 0 or <1 values that a non-distributed MEAN would never
return. Apply the same clamping logic here to keep results consistent across
execution modes.
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDa
return result, nil
}
+// mergeNonGroupByAggregation merges aggregation results from multiple shards
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+ if len(dataPoints) == 0 {
+ return nil
+ }
+ if len(dataPoints) == 1 {
+ return dataPoints
+ }
+ // Deduplicate by shard_id first (keep the one with highest version)
+ shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+ for _, idp := range dataPoints {
+ existing, exists := shardMap[idp.ShardId]
+ if !exists || idp.GetDataPoint().Version >
existing.GetDataPoint().Version {
+ shardMap[idp.ShardId] = idp
+ }
+ }
+ // Now merge results from different shards
+ deduplicatedDps := make([]*measurev1.InternalDataPoint, 0,
len(shardMap))
+ for _, idp := range shardMap {
+ deduplicatedDps = append(deduplicatedDps, idp)
+ }
+ if len(deduplicatedDps) == 1 {
+ return deduplicatedDps
+ }
+ // Merge aggregation results based on function type
+ fieldName := agg.FieldName
+ var result *measurev1.InternalDataPoint
+ switch agg.Function {
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareLess(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareGreater(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+ // Sum all values
+ result = deduplicatedDps[0]
+ resultDp := result.GetDataPoint()
+ var sumInt int64
+ var sumFloat float64
+ isFloat := false
+ for _, idp := range deduplicatedDps {
+ dp := idp.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ if fieldVal == nil {
+ continue
+ }
+ switch v := fieldVal.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if !isFloat {
+ sumInt += v.Int.Value
+ } else {
+ sumFloat += float64(v.Int.Value)
+ }
+ case *modelv1.FieldValue_Float:
+ if !isFloat {
+ isFloat = true
+ sumFloat = float64(sumInt) +
v.Float.Value
+ } else {
+ sumFloat += v.Float.Value
+ }
+ }
+ }
+ // Update the result field value
+ for i, field := range resultDp.Fields {
+ if field.Name == fieldName {
+ if isFloat {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float:
&modelv1.Float{Value: sumFloat}}}
+ } else {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value:
sumInt}}}
+ }
+ break
+ }
+ }
+ default:
+ // For other functions, just return the first one (should not
happen for needCompletePushDownAgg)
+ return deduplicatedDps[:1]
+ }
+ return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string)
*modelv1.FieldValue {
+ for _, field := range dp.Fields {
+ if field.Name == fieldName {
+ return field.Value
+ }
+ }
+ return nil
+}
+
+// compareLess compares two field values and returns true if the first is less
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value < val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value < val2.Float.Value
+ }
+ }
+ return false
+}
+
+// compareGreater compares two field values and returns true if the first is
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value > val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value > val2.Float.Value
+ }
+ }
+ return false
+}
+
// hashWithShard combines shard_id and group_key into a single hash.
func hashWithShard(shardID, groupKey uint64) uint64 {
h := uint64(offset64)
h = (h ^ shardID) * prime64
h = (h ^ groupKey) * prime64
return h
}
+
+type meanGroup struct {
+ dataPoint *measurev1.DataPoint
+ shardID uint32
+ sumInt int64
+ countInt int64
+ sumFloat float64
+ countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint,
error) {
+ groupMap := make(map[uint64]*meanGroup)
+ for _, idp := range dataPoints {
+ dp := idp.GetDataPoint()
+ var groupKey uint64
+ if len(groupByTagsRefs) > 0 {
+ var keyErr error
+ groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+ if keyErr != nil {
+ return nil, keyErr
+ }
+ }
+ var sumVal int64
+ var countVal int64
+ var sumFloat float64
+ var countFloat float64
+ var isFloat bool
+ var sumFound, countFound bool
+ for _, field := range dp.Fields {
+ if field.Name == fieldName+"_sum" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ sumVal = intVal.Value
+ isFloat = false
+ sumFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ sumFloat = floatVal.Value
+ isFloat = true
+ sumFound = true
+ }
+ } else if field.Name == fieldName+"_count" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ countVal = intVal.Value
+ countFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ countFloat = floatVal.Value
+ countFound = true
+ }
+ }
+ }
+ if !sumFound || !countFound {
+ continue
Review Comment:
`mergeMeanAggregation` silently `continue`s when a datapoint is missing
either the `_sum` or `_count` field, which can turn a partially valid response
into an empty/partial result without surfacing an error. Since
`mergeMeanAggregation` is only used for MEAN in the pushdown path, it should
either return an error on missing fields or explicitly handle a fallback
representation (e.g., already-merged mean).
```suggestion
return nil, fmt.Errorf("mergeMeanAggregation: datapoint
missing %s_sum or %s_count", fieldName, fieldName)
```
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -568,10 +585,256 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints
[]*measurev1.InternalDa
return result, nil
}
+// mergeNonGroupByAggregation merges aggregation results from multiple shards
when there's no groupBy.
+// For MIN/MAX, takes the min/max value; for SUM/COUNT, sums them up.
+func mergeNonGroupByAggregation(dataPoints []*measurev1.InternalDataPoint, agg
*measurev1.QueryRequest_Aggregation) []*measurev1.InternalDataPoint {
+ if len(dataPoints) == 0 {
+ return nil
+ }
+ if len(dataPoints) == 1 {
+ return dataPoints
+ }
+ // Deduplicate by shard_id first (keep the one with highest version)
+ shardMap := make(map[uint32]*measurev1.InternalDataPoint)
+ for _, idp := range dataPoints {
+ existing, exists := shardMap[idp.ShardId]
+ if !exists || idp.GetDataPoint().Version >
existing.GetDataPoint().Version {
+ shardMap[idp.ShardId] = idp
+ }
+ }
+ // Now merge results from different shards
+ deduplicatedDps := make([]*measurev1.InternalDataPoint, 0,
len(shardMap))
+ for _, idp := range shardMap {
+ deduplicatedDps = append(deduplicatedDps, idp)
+ }
+ if len(deduplicatedDps) == 1 {
+ return deduplicatedDps
+ }
+ // Merge aggregation results based on function type
+ fieldName := agg.FieldName
+ var result *measurev1.InternalDataPoint
+ switch agg.Function {
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareLess(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+ result = deduplicatedDps[0]
+ for i := 1; i < len(deduplicatedDps); i++ {
+ dp := deduplicatedDps[i].GetDataPoint()
+ resultDp := result.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ resultFieldVal := getFieldValue(resultDp, fieldName)
+ if fieldVal != nil && resultFieldVal != nil {
+ if compareGreater(fieldVal, resultFieldVal) {
+ result = deduplicatedDps[i]
+ }
+ }
+ }
+ case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+ // Sum all values
+ result = deduplicatedDps[0]
+ resultDp := result.GetDataPoint()
+ var sumInt int64
+ var sumFloat float64
+ isFloat := false
+ for _, idp := range deduplicatedDps {
+ dp := idp.GetDataPoint()
+ fieldVal := getFieldValue(dp, fieldName)
+ if fieldVal == nil {
+ continue
+ }
+ switch v := fieldVal.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if !isFloat {
+ sumInt += v.Int.Value
+ } else {
+ sumFloat += float64(v.Int.Value)
+ }
+ case *modelv1.FieldValue_Float:
+ if !isFloat {
+ isFloat = true
+ sumFloat = float64(sumInt) +
v.Float.Value
+ } else {
+ sumFloat += v.Float.Value
+ }
+ }
+ }
+ // Update the result field value
+ for i, field := range resultDp.Fields {
+ if field.Name == fieldName {
+ if isFloat {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float:
&modelv1.Float{Value: sumFloat}}}
+ } else {
+ resultDp.Fields[i].Value =
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value:
sumInt}}}
+ }
+ break
+ }
+ }
+ default:
+ // For other functions, just return the first one (should not
happen for needCompletePushDownAgg)
+ return deduplicatedDps[:1]
+ }
+ return []*measurev1.InternalDataPoint{result}
+}
+
+// getFieldValue extracts the field value from a data point.
+func getFieldValue(dp *measurev1.DataPoint, fieldName string)
*modelv1.FieldValue {
+ for _, field := range dp.Fields {
+ if field.Name == fieldName {
+ return field.Value
+ }
+ }
+ return nil
+}
+
+// compareLess compares two field values and returns true if the first is less
than the second.
+func compareLess(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value < val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value < val2.Float.Value
+ }
+ }
+ return false
+}
+
+// compareGreater compares two field values and returns true if the first is
greater than the second.
+func compareGreater(v1, v2 *modelv1.FieldValue) bool {
+ switch val1 := v1.Value.(type) {
+ case *modelv1.FieldValue_Int:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Int); ok {
+ return val1.Int.Value > val2.Int.Value
+ }
+ case *modelv1.FieldValue_Float:
+ if val2, ok := v2.Value.(*modelv1.FieldValue_Float); ok {
+ return val1.Float.Value > val2.Float.Value
+ }
+ }
+ return false
+}
+
// hashWithShard combines shard_id and group_key into a single hash.
func hashWithShard(shardID, groupKey uint64) uint64 {
h := uint64(offset64)
h = (h ^ shardID) * prime64
h = (h ^ groupKey) * prime64
return h
}
+
+type meanGroup struct {
+ dataPoint *measurev1.DataPoint
+ shardID uint32
+ sumInt int64
+ countInt int64
+ sumFloat float64
+ countFloat float64
+}
+
+// mergeMeanAggregation merges mean aggregation results (sum and count) from
multiple data nodes.
+func mergeMeanAggregation(dataPoints []*measurev1.InternalDataPoint, fieldName
string, groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.InternalDataPoint,
error) {
+ groupMap := make(map[uint64]*meanGroup)
+ for _, idp := range dataPoints {
+ dp := idp.GetDataPoint()
+ var groupKey uint64
+ if len(groupByTagsRefs) > 0 {
+ var keyErr error
+ groupKey, keyErr = formatGroupByKey(dp, groupByTagsRefs)
+ if keyErr != nil {
+ return nil, keyErr
+ }
+ }
+ var sumVal int64
+ var countVal int64
+ var sumFloat float64
+ var countFloat float64
+ var isFloat bool
+ var sumFound, countFound bool
+ for _, field := range dp.Fields {
+ if field.Name == fieldName+"_sum" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ sumVal = intVal.Value
+ isFloat = false
+ sumFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ sumFloat = floatVal.Value
+ isFloat = true
+ sumFound = true
+ }
+ } else if field.Name == fieldName+"_count" {
+ if intVal := field.Value.GetInt(); intVal !=
nil {
+ countVal = intVal.Value
+ countFound = true
+ } else if floatVal := field.Value.GetFloat();
floatVal != nil {
+ countFloat = floatVal.Value
+ countFound = true
+ }
+ }
+ }
+ if !sumFound || !countFound {
+ continue
+ }
+ if _, exists := groupMap[groupKey]; !exists {
+ groupMap[groupKey] = &meanGroup{
+ dataPoint: &measurev1.DataPoint{
+ TagFamilies: dp.TagFamilies,
+ },
+ shardID: idp.ShardId,
+ }
+ }
+ mg := groupMap[groupKey]
+ if isFloat {
+ mg.sumFloat += sumFloat
+ mg.countFloat += countFloat
+ } else {
+ mg.sumInt += sumVal
+ mg.countInt += countVal
+ }
+ }
+ result := make([]*measurev1.InternalDataPoint, 0, len(groupMap))
+ for _, mg := range groupMap {
+ var meanVal *modelv1.FieldValue
+ switch {
Review Comment:
`mergeMeanAggregation` builds `result` by ranging over a Go map (`for _, mg
:= range groupMap`), which produces nondeterministic output ordering. The
integration tests compare response ordering (unless `DisOrder` is set), so this
can cause flaky failures and unstable API results. Preserve a deterministic
order (e.g., track first-seen group keys from the input slice and emit results
in that order, or sort by group tags).
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -311,10 +311,27 @@ func (t *distributedPlan) Execute(ctx context.Context)
(mi executor.MIterator, e
span.Tagf("data_point_count", "%d", dataPointCount)
}
if t.needCompletePushDownAgg {
+ // deduplicate: remove duplicate results from multiple replicas
of the same shard
deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
if dedupErr != nil {
return nil, multierr.Append(err, dedupErr)
}
Review Comment:
The new deduplication step keeps the first datapoint per (shard_id,
groupKey) and ignores `DataPoint.Version`. This can select an older replica
result nondeterministically. Align with `sortedMIterator.loadDps` behavior by
preferring the highest `Version` when duplicates are seen.
##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -311,10 +311,27 @@ func (t *distributedPlan) Execute(ctx context.Context)
(mi executor.MIterator, e
span.Tagf("data_point_count", "%d", dataPointCount)
}
if t.needCompletePushDownAgg {
+ // deduplicate: remove duplicate results from multiple replicas
of the same shard
deduplicatedDps, dedupErr :=
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
if dedupErr != nil {
return nil, multierr.Append(err, dedupErr)
}
+ // merge: for MEAN, merge sum and count from different shards
with same groupKey
+ if t.queryTemplate.Agg != nil &&
+ t.queryTemplate.Agg.GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN {
+ mergedDps, mergeErr :=
mergeMeanAggregation(deduplicatedDps,
+ t.queryTemplate.Agg.FieldName,
t.groupByTagsRefs)
+ if mergeErr != nil {
+ return nil, multierr.Append(err, mergeErr)
+ }
+ return &pushedDownAggregatedIterator{dataPoints:
mergedDps}, err
+ }
Review Comment:
In the `needCompletePushDownAgg` path, MEAN always goes through
`mergeMeanAggregation`, which expects per-shard `field_sum`/`field_count`
fields. But data nodes only emit those fields when `isDistributedMean` is true
(currently gated on `criteria.GetGroupBy() != nil`), so a MEAN query without
groupBy will return datapoints with the normal `fieldName` only and will be
dropped by `mergeMeanAggregation` (empty result). Consider treating non-groupBy
MEAN as distributed mean too (emit sum/count), or adding a fallback merge path
for MEAN without sum/count (and still deduplicate replicas by shard_id).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]