hanahmily commented on code in PR #957:
URL:
https://github.com/apache/skywalking-banyandb/pull/957#discussion_r2750587188
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,18 @@ func Analyze(criteria *measurev1.QueryRequest, metadata
[]*commonv1.Metadata, ss
}
if criteria.GetAgg() != nil {
+ // Check if this is a distributed mean aggregation that needs
to return sum and count
+ // This happens when the query is pushed down from liaison node
to data node
+ isDistributedMean := false
+ if isDistributed && criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+ criteria.GetTop() == nil {
Review Comment:
The top operation should work well with the distributed aggregation.
##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -104,9 +127,39 @@ func newAggregationPlan[N aggregation.Number](gba
*unresolvedAggregation, prevPl
aggrFunc: aggrFunc,
aggregationFieldRef: fieldRef,
isGroup: gba.isGroup,
+ isDistributedMean: gba.isDistributedMean,
}, nil
}
+type distributedMeanFunc[N aggregation.Number] struct {
Review Comment:
move the implementation to the aggregation pkg.
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -119,10 +125,18 @@ func Analyze(criteria *measurev1.QueryRequest, metadata
[]*commonv1.Metadata, ss
}
if criteria.GetAgg() != nil {
+ // Check if this is a distributed mean aggregation that needs
to return sum and count
+ // This happens when the query is pushed down from liaison node
to data node
+ isDistributedMean := false
+ if isDistributed && criteria.GetAgg().GetFunction() ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN &&
+ criteria.GetTop() == nil {
+ isDistributedMean = true
+ }
plan = newUnresolvedAggregation(plan,
logical.NewField(criteria.GetAgg().GetFieldName()),
criteria.GetAgg().GetFunction(),
criteria.GetGroupBy() != nil,
+ isDistributedMean,
Review Comment:
```suggestion
```
Pass the distributed mean function through the 3rd parameter instead of
adding an extra flag.
##########
pkg/query/logical/measure/measure_plan_aggregation.go:
##########
@@ -86,14 +94,29 @@ type aggregationPlan[N aggregation.Number] struct {
aggrFunc aggregation.Func[N]
aggrType modelv1.AggregationFunction
isGroup bool
+ isDistributedMean bool
+}
+
+// DistributedMeanFunc is used for distributed mean aggregation, returning sum
and count.
+type DistributedMeanFunc[N aggregation.Number] interface {
Review Comment:
Change the interface `Func` in `aggregation.go` instead of creating a new
one.
##########
pkg/query/logical/measure/measure_analyzer.go:
##########
@@ -175,22 +183,23 @@ func DistributedAnalyze(criteria *measurev1.QueryRequest,
ss []logical.Schema) (
}
pushedLimit := int(limitParameter + criteria.GetOffset())
- if criteria.GetGroupBy() != nil {
- plan = newUnresolvedGroupBy(plan, groupByTags, false)
- pushedLimit = math.MaxInt
- }
+ // When needCompletePushDownAgg is true, aggregation is already done on
data nodes,
+ // so we should not create groupBy and aggregation plans on liaison node
Review Comment:
This assumption is not correct. Aggregation should be performed in the
liaison and the data nodes. We should create group-by and aggregation plans for
the liaison node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]