This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c12c4ed21 Map-Reduce Aggregation Redesign (#970)
c12c4ed21 is described below

commit c12c4ed214488eed23b3ed608b93906562e1aa84
Author: OmCheeLin <[email protected]>
AuthorDate: Sun Feb 22 17:17:02 2026 +0800

    Map-Reduce Aggregation Redesign (#970)
    
    
    
    * Enhance measure query capabilities with map-reduce aggregation support 
and improve topN post-processor logic. Fix error handling in aggregation 
iterator close method.
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/measure/v1/query.proto          |   4 +-
 banyand/measure/topn_post_processor.go             |  33 ++--
 banyand/query/processor.go                         |  15 +-
 docs/api-reference.md                              |   3 +-
 pkg/query/aggregation/aggregation.go               |  90 +++++++++-
 pkg/query/aggregation/function.go                  | 119 +++++++++++++
 pkg/query/logical/measure/measure_analyzer.go      |  27 ++-
 .../logical/measure/measure_plan_aggregation.go    | 185 +++++++++++++++------
 .../logical/measure/measure_plan_distributed.go    |  96 ++++++-----
 pkg/query/logical/measure/topn_analyzer.go         |   4 +-
 test/cases/measure/data/input/group_mean.ql        |  21 +++
 test/cases/measure/data/input/group_mean.yaml      |  34 ++++
 test/cases/measure/data/want/group_mean.yaml       |  54 ++++++
 test/cases/measure/measure.go                      |   1 +
 15 files changed, 546 insertions(+), 141 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3ffc1b9be..b9c8ff244 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
   - From: `<data-dir>/property/data/shard-<id>/...`
   - To: `<data-dir>/property/data/<group>/shard-<id>/...`
 - Add a generic snapshot coordination package for atomic snapshot transitions 
across trace and sidx.
+- Support map-reduce aggregation for measure queries: map phase (partial 
aggregation on data nodes) and reduce phase (final aggregation on liaison).
 
 ### Bug Fixes
 
diff --git a/api/proto/banyandb/measure/v1/query.proto 
b/api/proto/banyandb/measure/v1/query.proto
index a3d10adbd..e518762cc 100644
--- a/api/proto/banyandb/measure/v1/query.proto
+++ b/api/proto/banyandb/measure/v1/query.proto
@@ -69,6 +69,8 @@ message InternalDataPoint {
 message InternalQueryRequest {
   // The actual query request
   QueryRequest request = 1;
+  // agg_return_partial when true asks data nodes to return aggregation 
partials (for reduce at liaison)
+  bool agg_return_partial = 2;
 }
 
 // InternalQueryResponse is the internal response for distributed query.
@@ -138,6 +140,6 @@ message QueryRequest {
   bool trace = 13;
   // stages is used to specify the stage of the data points in the lifecycle
   repeated string stages = 14;
-  // rewriteAggTopNResult will rewrite agg result to raw data
+  // rewrite_agg_top_n_result will rewrite agg result to raw data
   bool rewrite_agg_top_n_result = 15;
 }
diff --git a/banyand/measure/topn_post_processor.go 
b/banyand/measure/topn_post_processor.go
index 4d219d73c..a97100c4a 100644
--- a/banyand/measure/topn_post_processor.go
+++ b/banyand/measure/topn_post_processor.go
@@ -70,9 +70,9 @@ func (taggr *topNPostProcessor) Len() int {
 // while for ASC, a max heap has to be built.
 func (taggr *topNPostProcessor) Less(i, j int) bool {
        if taggr.sort == modelv1.Sort_SORT_DESC {
-               return taggr.items[i].int64Func.Val() < 
taggr.items[j].int64Func.Val()
+               return taggr.items[i].mapFunc.Val() < 
taggr.items[j].mapFunc.Val()
        }
-       return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+       return taggr.items[i].mapFunc.Val() > taggr.items[j].mapFunc.Val()
 }
 
 func (taggr *topNPostProcessor) Swap(i, j int) {
@@ -99,9 +99,12 @@ func (taggr *topNPostProcessor) Pop() any {
 }
 
 func (taggr *topNPostProcessor) tryEnqueue(key string, item 
*topNAggregatorItem) {
+       if len(taggr.items) == 0 {
+               return
+       }
        if lowest := taggr.items[0]; lowest != nil {
-               shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() < item.int64Func.Val()) ||
-                       (taggr.sort != modelv1.Sort_SORT_DESC && 
lowest.int64Func.Val() > item.int64Func.Val())
+               shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC && 
lowest.mapFunc.Val() < item.mapFunc.Val()) ||
+                       (taggr.sort != modelv1.Sort_SORT_DESC && 
lowest.mapFunc.Val() > item.mapFunc.Val())
 
                if shouldReplace {
                        delete(taggr.cache, lowest.key)
@@ -116,12 +119,12 @@ func (taggr *topNPostProcessor) tryEnqueue(key string, 
item *topNAggregatorItem)
 var _ flow.Element = (*topNAggregatorItem)(nil)
 
 type topNAggregatorItem struct {
-       int64Func aggregation.Func[int64]
-       key       string
-       values    pbv1.EntityValues
-       val       int64
-       version   int64
-       index     int
+       mapFunc aggregation.Map[int64]
+       key     string
+       values  pbv1.EntityValues
+       val     int64
+       version int64
+       index   int
 }
 
 func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
@@ -245,18 +248,18 @@ func (taggr *topNPostProcessor) Flush() 
([]*topNAggregatorItem, error) {
                for _, timeline := range taggr.timelines {
                        for _, item := range timeline.items {
                                if exist, found := taggr.cache[item.key]; found 
{
-                                       exist.int64Func.In(item.val)
+                                       exist.mapFunc.In(item.val)
                                        heap.Fix(taggr, exist.index)
                                        continue
                                }
 
-                               aggrFunc, err := 
aggregation.NewFunc[int64](taggr.aggrFunc)
+                               mapFunc, err := 
aggregation.NewMap[int64](taggr.aggrFunc)
                                if err != nil {
                                        return nil, err
                                }
 
-                               item.int64Func = aggrFunc
-                               item.int64Func.In(item.val)
+                               item.mapFunc = mapFunc
+                               item.mapFunc.In(item.val)
 
                                if taggr.Len() < int(taggr.topN) {
                                        taggr.cache[item.key] = item
@@ -300,7 +303,7 @@ func (taggr *topNPostProcessor) valWithAggregation(tagNames 
[]string) ([]*measur
                        Entity: item.GetTags(tagNames),
                        Value: &modelv1.FieldValue{
                                Value: &modelv1.FieldValue_Int{
-                                       Int: &modelv1.Int{Value: 
item.int64Func.Val()},
+                                       Int: &modelv1.Int{Value: 
item.mapFunc.Val()},
                                },
                        },
                }
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index cbe7d783e..c6efbbda2 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -202,8 +202,13 @@ func buildMeasureContext(measureService measure.Service, 
log *logger.Logger, que
 }
 
 // executeMeasurePlan executes the measure query plan and returns the iterator.
-func executeMeasurePlan(ctx context.Context, queryCriteria 
*measurev1.QueryRequest, mctx *measureExecutionContext) (executor.MIterator, 
logical.Plan, error) {
-       plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata, 
mctx.schemas, mctx.ecc)
+func executeMeasurePlan(
+       ctx context.Context,
+       queryCriteria *measurev1.QueryRequest,
+       mctx *measureExecutionContext,
+       emitPartial bool,
+) (executor.MIterator, logical.Plan, error) {
+       plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata, 
mctx.schemas, mctx.ecc, emitPartial)
        if planErr != nil {
                return nil, nil, fmt.Errorf("fail to analyze the query request 
for measure %s: %w", queryCriteria.GetName(), planErr)
        }
@@ -343,7 +348,7 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a 
query event")
        }
 
-       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, false)
        if execErr != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
                return
@@ -462,7 +467,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an 
internal query event")
        }
 
-       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx)
+       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, internalRequest.GetAggReturnPartial())
        if execErr != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
                return
@@ -507,7 +512,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
                        mctx.ml.Error().Err(rewriteErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to rewrite the query criteria")
                } else {
                        rewriteQueryCriteria := 
buildRewriteQueryCriteria(queryCriteria, rewrittenCriteria)
-                       rewriteIterator, _, rewriteExecErr := 
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx)
+                       rewriteIterator, _, rewriteExecErr := 
executeMeasurePlan(ctx, rewriteQueryCriteria, mctx, false)
                        if rewriteExecErr != nil {
                                
mctx.ml.Error().Err(rewriteExecErr).RawJSON("req", 
logger.Proto(rewriteQueryCriteria)).Msg("fail to execute the rewrite query 
plan")
                        } else {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index aa75e0bca..764624cc9 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -960,6 +960,7 @@ Wraps QueryRequest for extensibility.
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
 | request | [QueryRequest](#banyandb-measure-v1-QueryRequest) |  | The actual 
query request |
+| agg_return_partial | [bool](#bool) |  | agg_return_partial when true asks 
data nodes to return aggregation partials (for reduce at liaison) |
 
 
 
@@ -1005,7 +1006,7 @@ QueryRequest is the request contract for query.
 | order_by | [banyandb.model.v1.QueryOrder](#banyandb-model-v1-QueryOrder) |  
| order_by is given to specify the sort for a tag. |
 | trace | [bool](#bool) |  | trace is used to enable trace for the query |
 | stages | [string](#string) | repeated | stages is used to specify the stage 
of the data points in the lifecycle |
-| rewrite_agg_top_n_result | [bool](#bool) |  | rewriteAggTopNResult will 
rewrite agg result to raw data |
+| rewrite_agg_top_n_result | [bool](#bool) |  | rewrite_agg_top_n_result will 
rewrite agg result to raw data |
 
 
 
diff --git a/pkg/query/aggregation/aggregation.go 
b/pkg/query/aggregation/aggregation.go
index a581e670a..c520a39d8 100644
--- a/pkg/query/aggregation/aggregation.go
+++ b/pkg/query/aggregation/aggregation.go
@@ -31,10 +31,26 @@ var (
        errUnSupportedFieldType = errors.New("unsupported field type")
 )
 
-// Func supports aggregation operations.
-type Func[N Number] interface {
+// Partial represents the intermediate result of a Map phase.
+// For most functions only Value is meaningful; for MEAN both Value (sum) and 
Count are used.
+type Partial[N Number] struct {
+       Value N
+       Count N
+}
+
+// Map accumulates raw values and produces aggregation results.
+// It serves as the local accumulator for raw data points.
+type Map[N Number] interface {
        In(N)
        Val() N
+       Partial() Partial[N]
+       Reset()
+}
+
+// Reduce combines intermediate results from Map phases into a final value.
+type Reduce[N Number] interface {
+       Combine(Partial[N])
+       Val() N
        Reset()
 }
 
@@ -43,9 +59,9 @@ type Number interface {
        ~int64 | ~float64
 }
 
-// NewFunc returns a aggregation function based on function type.
-func NewFunc[N Number](af modelv1.AggregationFunction) (Func[N], error) {
-       var result Func[N]
+// NewMap returns a Map aggregation function for the given type.
+func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error) {
+       var result Map[N]
        switch af {
        case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
                result = &meanFunc[N]{zero: zero[N]()}
@@ -64,6 +80,27 @@ func NewFunc[N Number](af modelv1.AggregationFunction) 
(Func[N], error) {
        return result, nil
 }
 
+// NewReduce returns a Reduce aggregation function for the given type.
+func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error) {
+       var result Reduce[N]
+       switch af {
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
+               result = &meanReduceFunc[N]{zero: zero[N]()}
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+               result = &countReduceFunc[N]{zero: zero[N]()}
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+               result = &maxReduceFunc[N]{min: minOf[N]()}
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+               result = &minReduceFunc[N]{max: maxOf[N]()}
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM:
+               result = &sumReduceFunc[N]{zero: zero[N]()}
+       default:
+               return nil, errors.WithMessagef(errUnknownFunc, "unknown 
function:%s", modelv1.AggregationFunction_name[int32(af)])
+       }
+       result.Reset()
+       return result, nil
+}
+
 // FromFieldValue transforms modelv1.FieldValue to Number.
 func FromFieldValue[N Number](fieldValue *modelv1.FieldValue) (N, error) {
        switch fieldValue.GetValue().(type) {
@@ -86,6 +123,49 @@ func ToFieldValue[N Number](value N) (*modelv1.FieldValue, 
error) {
        return nil, errUnSupportedFieldType
 }
 
+// PartialToFieldValues converts a Partial to field values for wire transport.
+// For MEAN it returns two values (Value/sum first, Count second); for others 
one value.
+func PartialToFieldValues[N Number](af modelv1.AggregationFunction, p 
Partial[N]) ([]*modelv1.FieldValue, error) {
+       if af == modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN {
+               vFv, err := ToFieldValue(p.Value)
+               if err != nil {
+                       return nil, err
+               }
+               cFv, err := ToFieldValue(p.Count)
+               if err != nil {
+                       return nil, err
+               }
+               return []*modelv1.FieldValue{vFv, cFv}, nil
+       }
+       vFv, err := ToFieldValue(p.Value)
+       if err != nil {
+               return nil, err
+       }
+       return []*modelv1.FieldValue{vFv}, nil
+}
+
+// FieldValuesToPartial converts field values from wire transport to a Partial.
+// For MEAN expects two values (sum, count); for others one value (Count will 
be zero).
+func FieldValuesToPartial[N Number](af modelv1.AggregationFunction, fvs 
[]*modelv1.FieldValue) (Partial[N], error) {
+       var p Partial[N]
+       if len(fvs) == 0 {
+               return p, nil
+       }
+       v, err := FromFieldValue[N](fvs[0])
+       if err != nil {
+               return p, err
+       }
+       p.Value = v
+       if af == modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN && 
len(fvs) >= 2 {
+               c, err := FromFieldValue[N](fvs[1])
+               if err != nil {
+                       return p, err
+               }
+               p.Count = c
+       }
+       return p, nil
+}
+
 func minOf[N Number]() (r N) {
        switch x := any(&r).(type) {
        case *int64:
diff --git a/pkg/query/aggregation/function.go 
b/pkg/query/aggregation/function.go
index faa019d97..0e009dac2 100644
--- a/pkg/query/aggregation/function.go
+++ b/pkg/query/aggregation/function.go
@@ -39,11 +39,42 @@ func (m meanFunc[N]) Val() N {
        return v
 }
 
+func (m meanFunc[N]) Partial() Partial[N] {
+       return Partial[N]{Value: m.sum, Count: m.count}
+}
+
 func (m *meanFunc[N]) Reset() {
        m.sum = m.zero
        m.count = m.zero
 }
 
+type meanReduceFunc[N Number] struct {
+       sum   N
+       count N
+       zero  N
+}
+
+func (m *meanReduceFunc[N]) Combine(p Partial[N]) {
+       m.sum += p.Value
+       m.count += p.Count
+}
+
+func (m meanReduceFunc[N]) Val() N {
+       if m.count == m.zero {
+               return m.zero
+       }
+       v := m.sum / m.count
+       if v < 1 {
+               return 1
+       }
+       return v
+}
+
+func (m *meanReduceFunc[N]) Reset() {
+       m.sum = m.zero
+       m.count = m.zero
+}
+
 type countFunc[N Number] struct {
        count N
        zero  N
@@ -57,10 +88,31 @@ func (c countFunc[N]) Val() N {
        return c.count
 }
 
+func (c countFunc[N]) Partial() Partial[N] {
+       return Partial[N]{Value: c.count}
+}
+
 func (c *countFunc[N]) Reset() {
        c.count = c.zero
 }
 
+type countReduceFunc[N Number] struct {
+       sum  N
+       zero N
+}
+
+func (c *countReduceFunc[N]) Combine(p Partial[N]) {
+       c.sum += p.Value
+}
+
+func (c countReduceFunc[N]) Val() N {
+       return c.sum
+}
+
+func (c *countReduceFunc[N]) Reset() {
+       c.sum = c.zero
+}
+
 type sumFunc[N Number] struct {
        sum  N
        zero N
@@ -74,10 +126,31 @@ func (s sumFunc[N]) Val() N {
        return s.sum
 }
 
+func (s sumFunc[N]) Partial() Partial[N] {
+       return Partial[N]{Value: s.sum}
+}
+
 func (s *sumFunc[N]) Reset() {
        s.sum = s.zero
 }
 
+type sumReduceFunc[N Number] struct {
+       sum  N
+       zero N
+}
+
+func (s *sumReduceFunc[N]) Combine(p Partial[N]) {
+       s.sum += p.Value
+}
+
+func (s sumReduceFunc[N]) Val() N {
+       return s.sum
+}
+
+func (s *sumReduceFunc[N]) Reset() {
+       s.sum = s.zero
+}
+
 type maxFunc[N Number] struct {
        val N
        min N
@@ -93,10 +166,33 @@ func (m maxFunc[N]) Val() N {
        return m.val
 }
 
+func (m maxFunc[N]) Partial() Partial[N] {
+       return Partial[N]{Value: m.val}
+}
+
 func (m *maxFunc[N]) Reset() {
        m.val = m.min
 }
 
+type maxReduceFunc[N Number] struct {
+       val N
+       min N
+}
+
+func (m *maxReduceFunc[N]) Combine(p Partial[N]) {
+       if p.Value > m.val {
+               m.val = p.Value
+       }
+}
+
+func (m maxReduceFunc[N]) Val() N {
+       return m.val
+}
+
+func (m *maxReduceFunc[N]) Reset() {
+       m.val = m.min
+}
+
 type minFunc[N Number] struct {
        val N
        max N
@@ -112,6 +208,29 @@ func (m minFunc[N]) Val() N {
        return m.val
 }
 
+func (m minFunc[N]) Partial() Partial[N] {
+       return Partial[N]{Value: m.val}
+}
+
 func (m *minFunc[N]) Reset() {
        m.val = m.max
 }
+
+type minReduceFunc[N Number] struct {
+       val N
+       max N
+}
+
+func (m *minReduceFunc[N]) Combine(p Partial[N]) {
+       if m.val == m.max || p.Value < m.val {
+               m.val = p.Value
+       }
+}
+
+func (m minReduceFunc[N]) Val() N {
+       return m.val
+}
+
+func (m *minReduceFunc[N]) Reset() {
+       m.val = m.max
+}
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index f92c5127e..d096b14ac 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -24,7 +24,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -55,7 +54,9 @@ func BuildSchema(md *databasev1.Measure, indexRules 
[]*databasev1.IndexRule) (lo
 }
 
 // Analyze converts logical expressions to executable operation tree 
represented by Plan.
-func Analyze(criteria *measurev1.QueryRequest, metadata []*commonv1.Metadata, 
ss []logical.Schema, ecc []executor.MeasureExecutionContext) (logical.Plan, 
error) {
+func Analyze(criteria *measurev1.QueryRequest, metadata []*commonv1.Metadata, 
ss []logical.Schema,
+       ecc []executor.MeasureExecutionContext, emitPartial bool,
+) (logical.Plan, error) {
        if len(metadata) != len(ss) {
                return nil, fmt.Errorf("number of schemas %d not equal to 
metadata count %d", len(ss), len(metadata))
        }
@@ -123,6 +124,8 @@ func Analyze(criteria *measurev1.QueryRequest, metadata 
[]*commonv1.Metadata, ss
                        logical.NewField(criteria.GetAgg().GetFieldName()),
                        criteria.GetAgg().GetFunction(),
                        criteria.GetGroupBy() != nil,
+                       emitPartial,
+                       false,
                )
                pushedLimit = math.MaxInt
        }
@@ -157,16 +160,8 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
                }
        }
 
-       // TODO: to support all aggregation functions
-       needCompletePushDownAgg := criteria.GetAgg() != nil &&
-               (criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM ||
-                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT) &&
-               criteria.GetTop() == nil
-
-       // parse fields
-       plan := newUnresolvedDistributed(criteria, needCompletePushDownAgg)
+       pushDownAgg := criteria.GetAgg() != nil && criteria.GetTop() == nil
+       plan := newUnresolvedDistributed(criteria, pushDownAgg)
 
        // parse limit and offset
        limitParameter := criteria.GetLimit()
@@ -181,14 +176,12 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
        }
 
        if criteria.GetAgg() != nil {
-               aggrFunc := criteria.GetAgg().GetFunction()
-               if needCompletePushDownAgg && aggrFunc == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT {
-                       aggrFunc = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM
-               }
                plan = newUnresolvedAggregation(plan,
                        logical.NewField(criteria.GetAgg().GetFieldName()),
-                       aggrFunc,
+                       criteria.GetAgg().GetFunction(),
                        criteria.GetGroupBy() != nil,
+                       false,       // emitPartial: liaison does not emit 
partial
+                       pushDownAgg, // reduceMode: only reduce partials when 
push-down is active (no TopN)
                )
                pushedLimit = math.MaxInt
        }
diff --git a/pkg/query/logical/measure/measure_plan_aggregation.go 
b/pkg/query/logical/measure/measure_plan_aggregation.go
index ec2b06a68..cd6f75847 100644
--- a/pkg/query/logical/measure/measure_plan_aggregation.go
+++ b/pkg/query/logical/measure/measure_plan_aggregation.go
@@ -32,25 +32,116 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+const aggCountFieldName = "__agg_count"
+
 var (
        _ logical.UnresolvedPlan = (*unresolvedAggregation)(nil)
 
        errUnsupportedAggregationField = errors.New("unsupported aggregation 
operation on this field")
 )
 
+// aggAccumulator abstracts the aggregation logic for both map and reduce 
modes.
+// It is injected into the existing iterators to avoid creating separate 
iterator types.
+type aggAccumulator[N aggregation.Number] interface {
+       Feed(dp *measurev1.DataPoint, fieldIdx int) error
+       Result(fieldName string) ([]*measurev1.DataPoint_Field, error)
+       Reset()
+}
+
+// mapAccumulator implements aggAccumulator for the map phase (data node side).
+type mapAccumulator[N aggregation.Number] struct {
+       mapFunc     aggregation.Map[N]
+       aggrType    modelv1.AggregationFunction
+       emitPartial bool
+}
+
+func (a *mapAccumulator[N]) Feed(dp *measurev1.DataPoint, fieldIdx int) error {
+       v, parseErr := 
aggregation.FromFieldValue[N](dp.GetFields()[fieldIdx].GetValue())
+       if parseErr != nil {
+               return parseErr
+       }
+       a.mapFunc.In(v)
+       return nil
+}
+
+func (a *mapAccumulator[N]) Result(fieldName string) 
([]*measurev1.DataPoint_Field, error) {
+       if a.emitPartial {
+               part := a.mapFunc.Partial()
+               fvs, partErr := aggregation.PartialToFieldValues(a.aggrType, 
part)
+               if partErr != nil {
+                       return nil, partErr
+               }
+               fields := make([]*measurev1.DataPoint_Field, len(fvs))
+               for idx, fv := range fvs {
+                       name := fieldName
+                       if idx > 0 {
+                               name = aggCountFieldName
+                       }
+                       fields[idx] = &measurev1.DataPoint_Field{Name: name, 
Value: fv}
+               }
+               return fields, nil
+       }
+       val, valErr := aggregation.ToFieldValue(a.mapFunc.Val())
+       if valErr != nil {
+               return nil, valErr
+       }
+       return []*measurev1.DataPoint_Field{{Name: fieldName, Value: val}}, nil
+}
+
+func (a *mapAccumulator[N]) Reset() {
+       a.mapFunc.Reset()
+}
+
+// reduceAccumulator implements aggAccumulator for the reduce phase (liaison 
side).
+type reduceAccumulator[N aggregation.Number] struct {
+       reduceFunc aggregation.Reduce[N]
+       aggrType   modelv1.AggregationFunction
+}
+
+func (a *reduceAccumulator[N]) Feed(dp *measurev1.DataPoint, _ int) error {
+       fvs := make([]*modelv1.FieldValue, len(dp.GetFields()))
+       for idx, f := range dp.GetFields() {
+               fvs[idx] = f.GetValue()
+       }
+       part, partErr := aggregation.FieldValuesToPartial[N](a.aggrType, fvs)
+       if partErr != nil {
+               return partErr
+       }
+       a.reduceFunc.Combine(part)
+       return nil
+}
+
+func (a *reduceAccumulator[N]) Result(fieldName string) 
([]*measurev1.DataPoint_Field, error) {
+       val, valErr := aggregation.ToFieldValue(a.reduceFunc.Val())
+       if valErr != nil {
+               return nil, valErr
+       }
+       return []*measurev1.DataPoint_Field{{Name: fieldName, Value: val}}, nil
+}
+
+func (a *reduceAccumulator[N]) Reset() {
+       a.reduceFunc.Reset()
+}
+
 type unresolvedAggregation struct {
        unresolvedInput  logical.UnresolvedPlan
        aggregationField *logical.Field
        aggrFunc         modelv1.AggregationFunction
        isGroup          bool
+       emitPartial      bool
+       reduceMode       bool
 }
 
-func newUnresolvedAggregation(input logical.UnresolvedPlan, aggrField 
*logical.Field, aggrFunc modelv1.AggregationFunction, isGroup bool) 
logical.UnresolvedPlan {
+func newUnresolvedAggregation(input logical.UnresolvedPlan, aggrField 
*logical.Field, aggrFunc modelv1.AggregationFunction,
+       isGroup bool, emitPartial bool, reduceMode bool,
+) logical.UnresolvedPlan {
        return &unresolvedAggregation{
                unresolvedInput:  input,
                aggrFunc:         aggrFunc,
                aggregationField: aggrField,
                isGroup:          isGroup,
+               emitPartial:      emitPartial,
+               reduceMode:       reduceMode,
        }
 }
 
@@ -83,7 +174,7 @@ type aggregationPlan[N aggregation.Number] struct {
        *logical.Parent
        schema              logical.Schema
        aggregationFieldRef *logical.FieldRef
-       aggrFunc            aggregation.Func[N]
+       accumulator         aggAccumulator[N]
        aggrType            modelv1.AggregationFunction
        isGroup             bool
 }
@@ -91,9 +182,19 @@ type aggregationPlan[N aggregation.Number] struct {
 func newAggregationPlan[N aggregation.Number](gba *unresolvedAggregation, 
prevPlan logical.Plan,
        measureSchema logical.Schema, fieldRef *logical.FieldRef,
 ) (*aggregationPlan[N], error) {
-       aggrFunc, err := aggregation.NewFunc[N](gba.aggrFunc)
-       if err != nil {
-               return nil, err
+       var acc aggAccumulator[N]
+       if gba.reduceMode {
+               reduceFunc, reduceErr := aggregation.NewReduce[N](gba.aggrFunc)
+               if reduceErr != nil {
+                       return nil, reduceErr
+               }
+               acc = &reduceAccumulator[N]{reduceFunc: reduceFunc, aggrType: 
gba.aggrFunc}
+       } else {
+               mapFunc, mapErr := aggregation.NewMap[N](gba.aggrFunc)
+               if mapErr != nil {
+                       return nil, mapErr
+               }
+               acc = &mapAccumulator[N]{mapFunc: mapFunc, aggrType: 
gba.aggrFunc, emitPartial: gba.emitPartial}
        }
        return &aggregationPlan[N]{
                Parent: &logical.Parent{
@@ -101,8 +202,9 @@ func newAggregationPlan[N aggregation.Number](gba 
*unresolvedAggregation, prevPl
                        Input:           prevPlan,
                },
                schema:              measureSchema,
-               aggrFunc:            aggrFunc,
+               accumulator:         acc,
                aggregationFieldRef: fieldRef,
+               aggrType:            gba.aggrFunc,
                isGroup:             gba.isGroup,
        }, nil
 }
@@ -128,28 +230,27 @@ func (g *aggregationPlan[N]) Execute(ec context.Context) 
(executor.MIterator, er
                return nil, err
        }
        if g.isGroup {
-               return newAggGroupMIterator(iter, g.aggregationFieldRef, 
g.aggrFunc), nil
+               return newAggGroupMIterator[N](iter, g.aggregationFieldRef, 
g.accumulator), nil
        }
-       return newAggAllIterator(iter, g.aggregationFieldRef, g.aggrFunc), nil
+       return newAggAllIterator[N](iter, g.aggregationFieldRef, 
g.accumulator), nil
 }
 
 type aggGroupIterator[N aggregation.Number] struct {
        prev                executor.MIterator
        aggregationFieldRef *logical.FieldRef
-       aggrFunc            aggregation.Func[N]
-
-       err error
+       accumulator         aggAccumulator[N]
+       err                 error
 }
 
 func newAggGroupMIterator[N aggregation.Number](
        prev executor.MIterator,
        aggregationFieldRef *logical.FieldRef,
-       aggrFunc aggregation.Func[N],
+       accumulator aggAccumulator[N],
 ) executor.MIterator {
        return &aggGroupIterator[N]{
                prev:                prev,
                aggregationFieldRef: aggregationFieldRef,
-               aggrFunc:            aggrFunc,
+               accumulator:         accumulator,
        }
 }
 
@@ -164,20 +265,16 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
        if ami.err != nil {
                return nil
        }
-       ami.aggrFunc.Reset()
+       ami.accumulator.Reset()
        group := ami.prev.Current()
        var resultDp *measurev1.DataPoint
        var shardID uint32
        for _, idp := range group {
                dp := idp.GetDataPoint()
-               value := dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
-                       GetValue()
-               v, err := aggregation.FromFieldValue[N](value)
-               if err != nil {
-                       ami.err = err
+               if feedErr := ami.accumulator.Feed(dp, 
ami.aggregationFieldRef.Spec.FieldIdx); feedErr != nil {
+                       ami.err = feedErr
                        return nil
                }
-               ami.aggrFunc.In(v)
                if resultDp != nil {
                        continue
                }
@@ -189,17 +286,12 @@ func (ami *aggGroupIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
        if resultDp == nil {
                return nil
        }
-       val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
-       if err != nil {
-               ami.err = err
+       fields, resultErr := 
ami.accumulator.Result(ami.aggregationFieldRef.Field.Name)
+       if resultErr != nil {
+               ami.err = resultErr
                return nil
        }
-       resultDp.Fields = []*measurev1.DataPoint_Field{
-               {
-                       Name:  ami.aggregationFieldRef.Field.Name,
-                       Value: val,
-               },
-       }
+       resultDp.Fields = fields
        return []*measurev1.InternalDataPoint{{DataPoint: resultDp, ShardId: 
shardID}}
 }
 
@@ -210,21 +302,20 @@ func (ami *aggGroupIterator[N]) Close() error {
 type aggAllIterator[N aggregation.Number] struct {
        prev                executor.MIterator
        aggregationFieldRef *logical.FieldRef
-       aggrFunc            aggregation.Func[N]
-
-       result *measurev1.DataPoint
-       err    error
+       accumulator         aggAccumulator[N]
+       result              *measurev1.DataPoint
+       err                 error
 }
 
 func newAggAllIterator[N aggregation.Number](
        prev executor.MIterator,
        aggregationFieldRef *logical.FieldRef,
-       aggrFunc aggregation.Func[N],
+       accumulator aggAccumulator[N],
 ) executor.MIterator {
        return &aggAllIterator[N]{
                prev:                prev,
                aggregationFieldRef: aggregationFieldRef,
-               aggrFunc:            aggrFunc,
+               accumulator:         accumulator,
        }
 }
 
@@ -237,14 +328,10 @@ func (ami *aggAllIterator[N]) Next() bool {
                group := ami.prev.Current()
                for _, idp := range group {
                        dp := idp.GetDataPoint()
-                       value := 
dp.GetFields()[ami.aggregationFieldRef.Spec.FieldIdx].
-                               GetValue()
-                       v, err := aggregation.FromFieldValue[N](value)
-                       if err != nil {
-                               ami.err = err
+                       if feedErr := ami.accumulator.Feed(dp, 
ami.aggregationFieldRef.Spec.FieldIdx); feedErr != nil {
+                               ami.err = feedErr
                                return false
                        }
-                       ami.aggrFunc.In(v)
                        if resultDp != nil {
                                continue
                        }
@@ -256,17 +343,12 @@ func (ami *aggAllIterator[N]) Next() bool {
        if resultDp == nil {
                return false
        }
-       val, err := aggregation.ToFieldValue(ami.aggrFunc.Val())
-       if err != nil {
-               ami.err = err
+       fields, resultErr := 
ami.accumulator.Result(ami.aggregationFieldRef.Field.Name)
+       if resultErr != nil {
+               ami.err = resultErr
                return false
        }
-       resultDp.Fields = []*measurev1.DataPoint_Field{
-               {
-                       Name:  ami.aggregationFieldRef.Field.Name,
-                       Value: val,
-               },
-       }
+       resultDp.Fields = fields
        ami.result = resultDp
        return true
 }
@@ -275,10 +357,9 @@ func (ami *aggAllIterator[N]) Current() 
[]*measurev1.InternalDataPoint {
        if ami.result == nil {
                return nil
        }
-       // For aggregation across all data, shard ID is not applicable
        return []*measurev1.InternalDataPoint{{DataPoint: ami.result, ShardId: 
0}}
 }
 
 func (ami *aggAllIterator[N]) Close() error {
-       return ami.prev.Close()
+       return multierr.Combine(ami.err, ami.prev.Close())
 }
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index d81d3f682..69fce0659 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -103,15 +103,15 @@ func (as *pushDownAggSchema) Children() []logical.Schema {
 }
 
 type unresolvedDistributed struct {
-       originalQuery           *measurev1.QueryRequest
-       groupByEntity           bool
-       needCompletePushDownAgg bool
+       originalQuery *measurev1.QueryRequest
+       groupByEntity bool
+       pushDownAgg   bool
 }
 
-func newUnresolvedDistributed(query *measurev1.QueryRequest, 
needCompletePushDownAgg bool) logical.UnresolvedPlan {
+func newUnresolvedDistributed(query *measurev1.QueryRequest, pushDownAgg bool) 
logical.UnresolvedPlan {
        return &unresolvedDistributed{
-               originalQuery:           query,
-               needCompletePushDownAgg: needCompletePushDownAgg,
+               originalQuery: query,
+               pushDownAgg:   pushDownAgg,
        }
 }
 
@@ -150,7 +150,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                Limit:           limit + ud.originalQuery.Offset,
                OrderBy:         ud.originalQuery.OrderBy,
        }
-       if ud.needCompletePushDownAgg {
+       if ud.pushDownAgg {
                temp.GroupBy = ud.originalQuery.GroupBy
                temp.Agg = ud.originalQuery.Agg
        }
@@ -163,7 +163,7 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
        }
        // Prepare groupBy tags refs if needed for deduplication
        var groupByTagsRefs [][]*logical.TagRef
-       if ud.needCompletePushDownAgg && ud.originalQuery.GetGroupBy() != nil {
+       if ud.pushDownAgg && ud.originalQuery.GetGroupBy() != nil {
                groupByTags := 
logical.ToTags(ud.originalQuery.GetGroupBy().GetTagProjection())
                var err error
                groupByTagsRefs, err = s.CreateTagRef(groupByTags...)
@@ -179,12 +179,12 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
                        return nil, fmt.Errorf("entity tag %s not found", e)
                }
                result := &distributedPlan{
-                       queryTemplate:           temp,
-                       s:                       s,
-                       sortByTime:              false,
-                       sortTagSpec:             *sortTagSpec,
-                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
-                       groupByTagsRefs:         groupByTagsRefs,
+                       queryTemplate:   temp,
+                       s:               s,
+                       sortByTime:      false,
+                       sortTagSpec:     *sortTagSpec,
+                       pushDownAgg:     ud.pushDownAgg,
+                       groupByTagsRefs: groupByTagsRefs,
                }
                if ud.originalQuery.OrderBy != nil && 
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -193,20 +193,20 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
        }
        if ud.originalQuery.OrderBy == nil {
                return &distributedPlan{
-                       queryTemplate:           temp,
-                       s:                       s,
-                       sortByTime:              true,
-                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
-                       groupByTagsRefs:         groupByTagsRefs,
+                       queryTemplate:   temp,
+                       s:               s,
+                       sortByTime:      true,
+                       pushDownAgg:     ud.pushDownAgg,
+                       groupByTagsRefs: groupByTagsRefs,
                }, nil
        }
        if ud.originalQuery.OrderBy.IndexRuleName == "" {
                result := &distributedPlan{
-                       queryTemplate:           temp,
-                       s:                       s,
-                       sortByTime:              true,
-                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
-                       groupByTagsRefs:         groupByTagsRefs,
+                       queryTemplate:   temp,
+                       s:               s,
+                       sortByTime:      true,
+                       pushDownAgg:     ud.pushDownAgg,
+                       groupByTagsRefs: groupByTagsRefs,
                }
                if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -225,12 +225,12 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
                return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
        }
        result := &distributedPlan{
-               queryTemplate:           temp,
-               s:                       s,
-               sortByTime:              false,
-               sortTagSpec:             *sortTagSpec,
-               needCompletePushDownAgg: ud.needCompletePushDownAgg,
-               groupByTagsRefs:         groupByTagsRefs,
+               queryTemplate:   temp,
+               s:               s,
+               sortByTime:      false,
+               sortTagSpec:     *sortTagSpec,
+               pushDownAgg:     ud.pushDownAgg,
+               groupByTagsRefs: groupByTagsRefs,
        }
        if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                result.desc = true
@@ -239,14 +239,14 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
 }
 
 type distributedPlan struct {
-       s                       logical.Schema
-       queryTemplate           *measurev1.QueryRequest
-       sortTagSpec             logical.TagSpec
-       groupByTagsRefs         [][]*logical.TagRef
-       maxDataPointsSize       uint32
-       sortByTime              bool
-       desc                    bool
-       needCompletePushDownAgg bool
+       s                 logical.Schema
+       queryTemplate     *measurev1.QueryRequest
+       sortTagSpec       logical.TagSpec
+       groupByTagsRefs   [][]*logical.TagRef
+       maxDataPointsSize uint32
+       sortByTime        bool
+       desc              bool
+       pushDownAgg       bool
 }
 
 func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, 
err error) {
@@ -272,7 +272,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                        }
                }()
        }
-       internalRequest := &measurev1.InternalQueryRequest{Request: 
queryRequest}
+       internalRequest := &measurev1.InternalQueryRequest{Request: 
queryRequest, AggReturnPartial: t.pushDownAgg}
        ff, broadcastErr := dctx.Broadcast(defaultQueryTimeout, 
data.TopicInternalMeasureQuery,
                
bus.NewMessageWithNodeSelectors(bus.MessageID(dctx.TimeRange().Begin.Nanos), 
dctx.NodeSelectors(), dctx.TimeRange(), internalRequest))
        if broadcastErr != nil {
@@ -292,7 +292,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                                if span != nil {
                                        span.AddSubTrace(d.Trace)
                                }
-                               if t.needCompletePushDownAgg {
+                               if t.pushDownAgg {
                                        pushedDownAggDps = 
append(pushedDownAggDps, d.DataPoints...)
                                        dataPointCount += len(d.DataPoints)
                                        continue
@@ -310,7 +310,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                span.Tagf("response_count", "%d", responseCount)
                span.Tagf("data_point_count", "%d", dataPointCount)
        }
-       if t.needCompletePushDownAgg {
+       if t.pushDownAgg {
                deduplicatedDps, dedupErr := 
deduplicateAggregatedDataPointsWithShard(pushedDownAggDps, t.groupByTagsRefs)
                if dedupErr != nil {
                        return nil, multierr.Append(err, dedupErr)
@@ -333,7 +333,7 @@ func (t *distributedPlan) Children() []logical.Plan {
 }
 
 func (t *distributedPlan) Schema() logical.Schema {
-       if t.needCompletePushDownAgg {
+       if t.pushDownAgg {
                return &pushDownAggSchema{
                        originalSchema:   t.s,
                        aggregationField: 
logical.NewField(t.queryTemplate.Agg.FieldName),
@@ -546,7 +546,16 @@ func (s *pushedDownAggregatedIterator) Close() error {
 // of the same shard, while preserving results from different shards.
 func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDataPoint, groupByTagsRefs [][]*logical.TagRef) 
([]*measurev1.InternalDataPoint, error) {
        if len(groupByTagsRefs) == 0 {
-               return dataPoints, nil
+               // No group-by: deduplicate by shard_id only
+               seen := make(map[uint32]struct{})
+               result := make([]*measurev1.InternalDataPoint, 0, 
len(dataPoints))
+               for _, idp := range dataPoints {
+                       if _, exists := seen[idp.GetShardId()]; !exists {
+                               seen[idp.GetShardId()] = struct{}{}
+                               result = append(result, idp)
+                       }
+               }
+               return result, nil
        }
        // key = hash(shard_id, group_key)
        // Same shard with same group key will be deduplicated
@@ -558,7 +567,6 @@ func deduplicateAggregatedDataPointsWithShard(dataPoints 
[]*measurev1.InternalDa
                if keyErr != nil {
                        return nil, keyErr
                }
-               // Include shard_id in key calculation
                key := hashWithShard(uint64(idp.ShardId), groupKey)
                if _, exists := groupMap[key]; !exists {
                        groupMap[key] = struct{}{}
diff --git a/pkg/query/logical/measure/topn_analyzer.go 
b/pkg/query/logical/measure/topn_analyzer.go
index 34aced932..cf2d87b68 100644
--- a/pkg/query/logical/measure/topn_analyzer.go
+++ b/pkg/query/logical/measure/topn_analyzer.go
@@ -116,7 +116,9 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, 
sourceMeasureSchemaList []*dat
                plan = newUnresolvedAggregation(plan,
                        &logical.Field{Name: topNAggSchema.FieldName},
                        criteria.GetAgg(),
-                       true)
+                       true,
+                       false,
+                       false)
        }
 
        plan = top(plan, &measurev1.QueryRequest_Top{
diff --git a/test/cases/measure/data/input/group_mean.ql 
b/test/cases/measure/data/input/group_mean.ql
new file mode 100644
index 000000000..61e648a32
--- /dev/null
+++ b/test/cases/measure/data/input/group_mean.ql
@@ -0,0 +1,21 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+SELECT id, total::field, value::field, MEAN(value) FROM MEASURE 
service_cpm_minute IN sw_metric
+TIME > '-15m'
+GROUP BY id, value
\ No newline at end of file
diff --git a/test/cases/measure/data/input/group_mean.yaml 
b/test/cases/measure/data/input/group_mean.yaml
new file mode 100644
index 000000000..b35af2e24
--- /dev/null
+++ b/test/cases/measure/data/input/group_mean.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id"]
+fieldProjection:
+  names: ["total", "value"]
+groupBy:
+  tagProjection:
+    tagFamilies:
+    - name: "default"
+      tags: ["id"]
+  fieldName: "value"
+agg:
+  function: "AGGREGATION_FUNCTION_MEAN"
+  fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/group_mean.yaml 
b/test/cases/measure/data/want/group_mean.yaml
new file mode 100644
index 000000000..82d0e8cdd
--- /dev/null
+++ b/test/cases/measure/data/want/group_mean.yaml
@@ -0,0 +1,54 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: value
+    value:
+      int:
+        value: "2"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc1
+- fields:
+  - name: value
+    value:
+      int:
+        value: "4"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc2
+- fields:
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc3
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 2679c8b33..fadf4ee02 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -52,6 +52,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group and sum", helpers.Args{Input: "group_sum", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group and count", helpers.Args{Input: "group_count", Duration: 
25 * time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("group and mean", helpers.Args{Input: "group_mean", Duration: 
25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group without field", helpers.Args{Input: "group_no_field", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),


Reply via email to