This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cfb825d Push down min/max aggregation operation to data nodes (#694)
0cfb825d is described below

commit 0cfb825d5410dc087fe47f656b56efcabfe0059d
Author: hui lai <[email protected]>
AuthorDate: Thu Aug 7 13:10:17 2025 +0800

    Push down min/max aggregation operation to data nodes (#694)
    
    * push down aggregation to data nodes
    
    
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   1 +
 pkg/query/logical/measure/measure_analyzer.go      |   9 +-
 .../logical/measure/measure_plan_distributed.go    | 154 +++++++++++++++++----
 test/cases/measure/data/input/group_min.yaml       |  34 +++++
 test/cases/measure/data/want/group_min.yaml        |  54 ++++++++
 test/cases/measure/measure.go                      |   1 +
 6 files changed, 228 insertions(+), 25 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1e2de1ed..07d19936 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
 - Data Model: Introduce the `Trace` data model to store the trace/span data.
 - Support dictionary encoding for low cardinality columns.
 - Push down aggregation for topN query.
+- Push down min/max aggregation to data nodes
 - Introduce write queue mechanism in liaison nodes to efficiently synchronize 
stream and measure partition folders, improving write throughput and consistency
 - Add trace module metadata management.
 - Add chunked data sync to improve memory efficiency and performance during 
data transfer operations, supporting configurable chunk sizes, retry 
mechanisms, and out-of-order handling for both measure and stream services.
diff --git a/pkg/query/logical/measure/measure_analyzer.go 
b/pkg/query/logical/measure/measure_analyzer.go
index 5eb69674..d77ea8ea 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -24,6 +24,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -143,8 +144,14 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest, 
ss []logical.Schema) (
                }
        }
 
+       // TODO: to support all aggregation functions
+       needCompletePushDownAgg := criteria.GetAgg() != nil &&
+               (criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
+                       criteria.GetAgg().GetFunction() == 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN) &&
+               criteria.GetTop() == nil
+
        // parse fields
-       plan := newUnresolvedDistributed(criteria)
+       plan := newUnresolvedDistributed(criteria, needCompletePushDownAgg)
 
        // parse limit and offset
        limitParameter := criteria.GetLimit()
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index e7dafb7a..10d25248 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -28,6 +28,7 @@ import (
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/data"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -44,14 +45,72 @@ const defaultQueryTimeout = 15 * time.Second
 
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
+type pushDownAggSchema struct {
+       originalSchema   logical.Schema
+       aggregationField *logical.Field
+}
+
+func (as *pushDownAggSchema) CreateFieldRef(fields ...*logical.Field) 
([]*logical.FieldRef, error) {
+       originalRefs, err := as.originalSchema.CreateFieldRef(fields...)
+       if err != nil {
+               return nil, err
+       }
+       for _, ref := range originalRefs {
+               if ref.Spec != nil {
+                       ref.Spec.FieldIdx = 0
+               }
+       }
+       return originalRefs, nil
+}
+
+func (as *pushDownAggSchema) CreateTagRef(tags ...[]*logical.Tag) 
([][]*logical.TagRef, error) {
+       return as.originalSchema.CreateTagRef(tags...)
+}
+
+func (as *pushDownAggSchema) ProjTags(refs ...[]*logical.TagRef) 
logical.Schema {
+       return &pushDownAggSchema{
+               originalSchema:   as.originalSchema.ProjTags(refs...),
+               aggregationField: as.aggregationField,
+       }
+}
+
+func (as *pushDownAggSchema) ProjFields(fieldRefs ...*logical.FieldRef) 
logical.Schema {
+       return &pushDownAggSchema{
+               originalSchema:   as.originalSchema.ProjFields(fieldRefs...),
+               aggregationField: as.aggregationField,
+       }
+}
+
+func (as *pushDownAggSchema) FindTagSpecByName(name string) *logical.TagSpec {
+       return as.originalSchema.FindTagSpecByName(name)
+}
+
+func (as *pushDownAggSchema) IndexDefined(tagName string) (bool, 
*databasev1.IndexRule) {
+       return as.originalSchema.IndexDefined(tagName)
+}
+
+func (as *pushDownAggSchema) IndexRuleDefined(ruleName string) (bool, 
*databasev1.IndexRule) {
+       return as.originalSchema.IndexRuleDefined(ruleName)
+}
+
+func (as *pushDownAggSchema) EntityList() []string {
+       return as.originalSchema.EntityList()
+}
+
+func (as *pushDownAggSchema) Children() []logical.Schema {
+       return as.originalSchema.Children()
+}
+
 type unresolvedDistributed struct {
-       originalQuery *measurev1.QueryRequest
-       groupByEntity bool
+       originalQuery           *measurev1.QueryRequest
+       groupByEntity           bool
+       needCompletePushDownAgg bool
 }
 
-func newUnresolvedDistributed(query *measurev1.QueryRequest) 
logical.UnresolvedPlan {
+func newUnresolvedDistributed(query *measurev1.QueryRequest, 
needCompletePushDownAgg bool) logical.UnresolvedPlan {
        return &unresolvedDistributed{
-               originalQuery: query,
+               originalQuery:           query,
+               needCompletePushDownAgg: needCompletePushDownAgg,
        }
 }
 
@@ -90,6 +149,10 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema) 
(logical.Plan, error)
                Limit:           limit + ud.originalQuery.Offset,
                OrderBy:         ud.originalQuery.OrderBy,
        }
+       if ud.needCompletePushDownAgg {
+               temp.GroupBy = ud.originalQuery.GroupBy
+               temp.Agg = ud.originalQuery.Agg
+       }
        // push down groupBy, agg and top to data node and rewrite agg result 
to raw data
        if ud.originalQuery.Agg != nil && ud.originalQuery.Top != nil {
                temp.RewriteAggTopNResult = true
@@ -104,10 +167,11 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
                        return nil, fmt.Errorf("entity tag %s not found", e)
                }
                result := &distributedPlan{
-                       queryTemplate: temp,
-                       s:             s,
-                       sortByTime:    false,
-                       sortTagSpec:   *sortTagSpec,
+                       queryTemplate:           temp,
+                       s:                       s,
+                       sortByTime:              false,
+                       sortTagSpec:             *sortTagSpec,
+                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
                }
                if ud.originalQuery.OrderBy != nil && 
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -116,16 +180,18 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
        }
        if ud.originalQuery.OrderBy == nil {
                return &distributedPlan{
-                       queryTemplate: temp,
-                       s:             s,
-                       sortByTime:    true,
+                       queryTemplate:           temp,
+                       s:                       s,
+                       sortByTime:              true,
+                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
                }, nil
        }
        if ud.originalQuery.OrderBy.IndexRuleName == "" {
                result := &distributedPlan{
-                       queryTemplate: temp,
-                       s:             s,
-                       sortByTime:    true,
+                       queryTemplate:           temp,
+                       s:                       s,
+                       sortByTime:              true,
+                       needCompletePushDownAgg: ud.needCompletePushDownAgg,
                }
                if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                        result.desc = true
@@ -144,10 +210,11 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
                return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
        }
        result := &distributedPlan{
-               queryTemplate: temp,
-               s:             s,
-               sortByTime:    false,
-               sortTagSpec:   *sortTagSpec,
+               queryTemplate:           temp,
+               s:                       s,
+               sortByTime:              false,
+               sortTagSpec:             *sortTagSpec,
+               needCompletePushDownAgg: ud.needCompletePushDownAgg,
        }
        if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
                result.desc = true
@@ -156,12 +223,13 @@ func (ud *unresolvedDistributed) Analyze(s 
logical.Schema) (logical.Plan, error)
 }
 
 type distributedPlan struct {
-       s                 logical.Schema
-       queryTemplate     *measurev1.QueryRequest
-       sortTagSpec       logical.TagSpec
-       sortByTime        bool
-       desc              bool
-       maxDataPointsSize uint32
+       s                       logical.Schema
+       queryTemplate           *measurev1.QueryRequest
+       sortTagSpec             logical.TagSpec
+       maxDataPointsSize       uint32
+       sortByTime              bool
+       desc                    bool
+       needCompletePushDownAgg bool
 }
 
 func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, 
err error) {
@@ -193,6 +261,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
                return nil, err
        }
        var see []sort.Iterator[*comparableDataPoint]
+       var pushedDownAggDps []*measurev1.DataPoint
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        err = multierr.Append(err, getErr)
@@ -205,11 +274,18 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(mi executor.MIterator, e
                        if span != nil {
                                span.AddSubTrace(resp.Trace)
                        }
+                       if t.needCompletePushDownAgg {
+                               pushedDownAggDps = append(pushedDownAggDps, 
resp.DataPoints...)
+                               continue
+                       }
                        see = append(see,
                                newSortableElements(resp.DataPoints,
                                        t.sortByTime, t.sortTagSpec))
                }
        }
+       if t.needCompletePushDownAgg {
+               return &pushedDownAggregatedIterator{dataPoints: 
pushedDownAggDps}, err
+       }
        smi := &sortedMIterator{
                Iterator: sort.NewItemIter(see, t.desc),
        }
@@ -226,6 +302,12 @@ func (t *distributedPlan) Children() []logical.Plan {
 }
 
 func (t *distributedPlan) Schema() logical.Schema {
+       if t.needCompletePushDownAgg {
+               return &pushDownAggSchema{
+                       originalSchema:   t.s,
+                       aggregationField: 
logical.NewField(t.queryTemplate.Agg.FieldName),
+               }
+       }
        return t.s
 }
 
@@ -399,3 +481,27 @@ func hashDataPoint(dp *measurev1.DataPoint) uint64 {
        h = (h ^ uint64(dp.Timestamp.Nanos)) * prime64
        return h
 }
+
+type pushedDownAggregatedIterator struct {
+       dataPoints []*measurev1.DataPoint
+       index      int
+}
+
+func (s *pushedDownAggregatedIterator) Next() bool {
+       if s.index >= len(s.dataPoints) {
+               return false
+       }
+       s.index++
+       return true
+}
+
+func (s *pushedDownAggregatedIterator) Current() []*measurev1.DataPoint {
+       if s.index == 0 || s.index > len(s.dataPoints) {
+               return nil
+       }
+       return []*measurev1.DataPoint{s.dataPoints[s.index-1]}
+}
+
+func (s *pushedDownAggregatedIterator) Close() error {
+       return nil
+}
diff --git a/test/cases/measure/data/input/group_min.yaml 
b/test/cases/measure/data/input/group_min.yaml
new file mode 100644
index 00000000..f5b2b597
--- /dev/null
+++ b/test/cases/measure/data/input/group_min.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+  - name: "default"
+    tags: ["id"]
+fieldProjection:
+  names: ["total", "value"]
+groupBy:
+  tagProjection:
+    tagFamilies:
+    - name: "default"
+      tags: ["id"]
+  fieldName: "value"
+agg:
+  function: "AGGREGATION_FUNCTION_MIN"
+  fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/group_min.yaml 
b/test/cases/measure/data/want/group_min.yaml
new file mode 100644
index 00000000..52a4fb46
--- /dev/null
+++ b/test/cases/measure/data/want/group_min.yaml
@@ -0,0 +1,54 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+  - name: value
+    value:
+      int:
+        value: "1"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc1
+- fields:
+  - name: value
+    value:
+      int:
+        value: "4"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc2
+- fields:
+  - name: value
+    value:
+      int:
+        value: "6"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: id
+      value:
+        str:
+          value: svc3
\ No newline at end of file
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index a7ec3979..aeacde4f 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -47,6 +47,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("filter by a integer tag", helpers.Args{Input: 
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by an unknown tag", helpers.Args{Input: 
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantEmpty: true}),
        g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
+       g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25 
* time.Minute, Offset: -20 * time.Minute}),
        g.Entry("group without field", helpers.Args{Input: "group_no_field", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
        g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),

Reply via email to