This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0cfb825d Push down min/max aggregation operation to data nodes (#694)
0cfb825d is described below
commit 0cfb825d5410dc087fe47f656b56efcabfe0059d
Author: hui lai <[email protected]>
AuthorDate: Thu Aug 7 13:10:17 2025 +0800
Push down min/max aggregation operation to data nodes (#694)
* push down aggregation to data nodes
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
pkg/query/logical/measure/measure_analyzer.go | 9 +-
.../logical/measure/measure_plan_distributed.go | 154 +++++++++++++++++----
test/cases/measure/data/input/group_min.yaml | 34 +++++
test/cases/measure/data/want/group_min.yaml | 54 ++++++++
test/cases/measure/measure.go | 1 +
6 files changed, 228 insertions(+), 25 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1e2de1ed..07d19936 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
- Data Model: Introduce the `Trace` data model to store the trace/span data.
- Support dictionary encoding for low cardinality columns.
- Push down aggregation for topN query.
+- Push down min/max aggregation to data nodes
- Introduce write queue mechanism in liaison nodes to efficiently synchronize
stream and measure partition folders, improving write throughput and consistency
- Add trace module metadata management.
- Add chunked data sync to improve memory efficiency and performance during
data transfer operations, supporting configurable chunk sizes, retry
mechanisms, and out-of-order handling for both measure and stream services.
diff --git a/pkg/query/logical/measure/measure_analyzer.go
b/pkg/query/logical/measure/measure_analyzer.go
index 5eb69674..d77ea8ea 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -24,6 +24,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -143,8 +144,14 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
}
+ // TODO: to support all aggregation functions
+ needCompletePushDownAgg := criteria.GetAgg() != nil &&
+ (criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX ||
+ criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN) &&
+ criteria.GetTop() == nil
+
// parse fields
- plan := newUnresolvedDistributed(criteria)
+ plan := newUnresolvedDistributed(criteria, needCompletePushDownAgg)
// parse limit and offset
limitParameter := criteria.GetLimit()
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index e7dafb7a..10d25248 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/data"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -44,14 +45,72 @@ const defaultQueryTimeout = 15 * time.Second
var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
+type pushDownAggSchema struct {
+ originalSchema logical.Schema
+ aggregationField *logical.Field
+}
+
+func (as *pushDownAggSchema) CreateFieldRef(fields ...*logical.Field)
([]*logical.FieldRef, error) {
+ originalRefs, err := as.originalSchema.CreateFieldRef(fields...)
+ if err != nil {
+ return nil, err
+ }
+ for _, ref := range originalRefs {
+ if ref.Spec != nil {
+ ref.Spec.FieldIdx = 0
+ }
+ }
+ return originalRefs, nil
+}
+
+func (as *pushDownAggSchema) CreateTagRef(tags ...[]*logical.Tag)
([][]*logical.TagRef, error) {
+ return as.originalSchema.CreateTagRef(tags...)
+}
+
+func (as *pushDownAggSchema) ProjTags(refs ...[]*logical.TagRef)
logical.Schema {
+ return &pushDownAggSchema{
+ originalSchema: as.originalSchema.ProjTags(refs...),
+ aggregationField: as.aggregationField,
+ }
+}
+
+func (as *pushDownAggSchema) ProjFields(fieldRefs ...*logical.FieldRef)
logical.Schema {
+ return &pushDownAggSchema{
+ originalSchema: as.originalSchema.ProjFields(fieldRefs...),
+ aggregationField: as.aggregationField,
+ }
+}
+
+func (as *pushDownAggSchema) FindTagSpecByName(name string) *logical.TagSpec {
+ return as.originalSchema.FindTagSpecByName(name)
+}
+
+func (as *pushDownAggSchema) IndexDefined(tagName string) (bool,
*databasev1.IndexRule) {
+ return as.originalSchema.IndexDefined(tagName)
+}
+
+func (as *pushDownAggSchema) IndexRuleDefined(ruleName string) (bool,
*databasev1.IndexRule) {
+ return as.originalSchema.IndexRuleDefined(ruleName)
+}
+
+func (as *pushDownAggSchema) EntityList() []string {
+ return as.originalSchema.EntityList()
+}
+
+func (as *pushDownAggSchema) Children() []logical.Schema {
+ return as.originalSchema.Children()
+}
+
type unresolvedDistributed struct {
- originalQuery *measurev1.QueryRequest
- groupByEntity bool
+ originalQuery *measurev1.QueryRequest
+ groupByEntity bool
+ needCompletePushDownAgg bool
}
-func newUnresolvedDistributed(query *measurev1.QueryRequest)
logical.UnresolvedPlan {
+func newUnresolvedDistributed(query *measurev1.QueryRequest,
needCompletePushDownAgg bool) logical.UnresolvedPlan {
return &unresolvedDistributed{
- originalQuery: query,
+ originalQuery: query,
+ needCompletePushDownAgg: needCompletePushDownAgg,
}
}
@@ -90,6 +149,10 @@ func (ud *unresolvedDistributed) Analyze(s logical.Schema)
(logical.Plan, error)
Limit: limit + ud.originalQuery.Offset,
OrderBy: ud.originalQuery.OrderBy,
}
+ if ud.needCompletePushDownAgg {
+ temp.GroupBy = ud.originalQuery.GroupBy
+ temp.Agg = ud.originalQuery.Agg
+ }
// push down groupBy, agg and top to data node and rewrite agg result
to raw data
if ud.originalQuery.Agg != nil && ud.originalQuery.Top != nil {
temp.RewriteAggTopNResult = true
@@ -104,10 +167,11 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
return nil, fmt.Errorf("entity tag %s not found", e)
}
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: false,
- sortTagSpec: *sortTagSpec,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: false,
+ sortTagSpec: *sortTagSpec,
+ needCompletePushDownAgg: ud.needCompletePushDownAgg,
}
if ud.originalQuery.OrderBy != nil &&
ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -116,16 +180,18 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
}
if ud.originalQuery.OrderBy == nil {
return &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: true,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ needCompletePushDownAgg: ud.needCompletePushDownAgg,
}, nil
}
if ud.originalQuery.OrderBy.IndexRuleName == "" {
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: true,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: true,
+ needCompletePushDownAgg: ud.needCompletePushDownAgg,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -144,10 +210,11 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
return nil, fmt.Errorf("tag %s not found", indexRule.Tags[0])
}
result := &distributedPlan{
- queryTemplate: temp,
- s: s,
- sortByTime: false,
- sortTagSpec: *sortTagSpec,
+ queryTemplate: temp,
+ s: s,
+ sortByTime: false,
+ sortTagSpec: *sortTagSpec,
+ needCompletePushDownAgg: ud.needCompletePushDownAgg,
}
if ud.originalQuery.OrderBy.Sort == modelv1.Sort_SORT_DESC {
result.desc = true
@@ -156,12 +223,13 @@ func (ud *unresolvedDistributed) Analyze(s
logical.Schema) (logical.Plan, error)
}
type distributedPlan struct {
- s logical.Schema
- queryTemplate *measurev1.QueryRequest
- sortTagSpec logical.TagSpec
- sortByTime bool
- desc bool
- maxDataPointsSize uint32
+ s logical.Schema
+ queryTemplate *measurev1.QueryRequest
+ sortTagSpec logical.TagSpec
+ maxDataPointsSize uint32
+ sortByTime bool
+ desc bool
+ needCompletePushDownAgg bool
}
func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator,
err error) {
@@ -193,6 +261,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
return nil, err
}
var see []sort.Iterator[*comparableDataPoint]
+ var pushedDownAggDps []*measurev1.DataPoint
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
err = multierr.Append(err, getErr)
@@ -205,11 +274,18 @@ func (t *distributedPlan) Execute(ctx context.Context)
(mi executor.MIterator, e
if span != nil {
span.AddSubTrace(resp.Trace)
}
+ if t.needCompletePushDownAgg {
+ pushedDownAggDps = append(pushedDownAggDps,
resp.DataPoints...)
+ continue
+ }
see = append(see,
newSortableElements(resp.DataPoints,
t.sortByTime, t.sortTagSpec))
}
}
+ if t.needCompletePushDownAgg {
+ return &pushedDownAggregatedIterator{dataPoints:
pushedDownAggDps}, err
+ }
smi := &sortedMIterator{
Iterator: sort.NewItemIter(see, t.desc),
}
@@ -226,6 +302,12 @@ func (t *distributedPlan) Children() []logical.Plan {
}
func (t *distributedPlan) Schema() logical.Schema {
+ if t.needCompletePushDownAgg {
+ return &pushDownAggSchema{
+ originalSchema: t.s,
+ aggregationField:
logical.NewField(t.queryTemplate.Agg.FieldName),
+ }
+ }
return t.s
}
@@ -399,3 +481,27 @@ func hashDataPoint(dp *measurev1.DataPoint) uint64 {
h = (h ^ uint64(dp.Timestamp.Nanos)) * prime64
return h
}
+
+type pushedDownAggregatedIterator struct {
+ dataPoints []*measurev1.DataPoint
+ index int
+}
+
+func (s *pushedDownAggregatedIterator) Next() bool {
+ if s.index >= len(s.dataPoints) {
+ return false
+ }
+ s.index++
+ return true
+}
+
+func (s *pushedDownAggregatedIterator) Current() []*measurev1.DataPoint {
+ if s.index == 0 || s.index > len(s.dataPoints) {
+ return nil
+ }
+ return []*measurev1.DataPoint{s.dataPoints[s.index-1]}
+}
+
+func (s *pushedDownAggregatedIterator) Close() error {
+ return nil
+}
diff --git a/test/cases/measure/data/input/group_min.yaml
b/test/cases/measure/data/input/group_min.yaml
new file mode 100644
index 00000000..f5b2b597
--- /dev/null
+++ b/test/cases/measure/data/input/group_min.yaml
@@ -0,0 +1,34 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "service_cpm_minute"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+fieldProjection:
+ names: ["total", "value"]
+groupBy:
+ tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id"]
+ fieldName: "value"
+agg:
+ function: "AGGREGATION_FUNCTION_MIN"
+ fieldName: "value"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/group_min.yaml
b/test/cases/measure/data/want/group_min.yaml
new file mode 100644
index 00000000..52a4fb46
--- /dev/null
+++ b/test/cases/measure/data/want/group_min.yaml
@@ -0,0 +1,54 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: value
+ value:
+ int:
+ value: "1"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc1
+- fields:
+ - name: value
+ value:
+ int:
+ value: "4"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc2
+- fields:
+ - name: value
+ value:
+ int:
+ value: "6"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc3
\ No newline at end of file
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index a7ec3979..aeacde4f 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -47,6 +47,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("filter by a integer tag", helpers.Args{Input:
"tag_filter_int", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("filter by an unknown tag", helpers.Args{Input:
"tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute,
WantEmpty: true}),
g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("group and min", helpers.Args{Input: "group_min", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
g.Entry("group without field", helpers.Args{Input: "group_no_field",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("top 2 by id", helpers.Args{Input: "top", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("bottom 2 by id", helpers.Args{Input: "bottom", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),