hanahmily opened a new issue, #13694: URL: https://github.com/apache/skywalking/issues/13694
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/skywalking/issues?q=is%3Aissue) and found no similar feature requirement. ### Description ## Current Architecture The `Func[N]` interface (`In`, `Val`, `Reset`) is used as a monolithic accumulator everywhere: - Data node aggregation plans (via `Analyze`) - Liaison node aggregation plans (via `DistributedAnalyze`) - TopN post-processor The distributed push-down is handled with hacks: - `needCompletePushDownAgg` only supports MAX/MIN/SUM/COUNT (not MEAN) - COUNT is silently converted to SUM at the liaison ([measure_analyzer.go:185-186](pkg/query/logical/measure/measure_analyzer.go)) - The liaison just deduplicates push-down results rather than properly reducing ## New Interface Design ### Intermediate type in [aggregation.go](pkg/query/aggregation/aggregation.go) ```go // Partial represents the intermediate result of a Map phase. // For most functions, only Value is meaningful. // For MEAN, both Value (sum) and Count are used. type Partial[N Number] struct { Value N Count N } ``` ### Map interface (replaces Func, used by data node, standalone, and TopN) ```go // Map accumulates raw values and produces aggregation results. // It serves as the local accumulator for raw data points. type Map[N Number] interface { // In feeds a raw value into the accumulator. In(N) // Val returns the locally finalized aggregation result. // For MEAN, this computes sum/count. For others, same as Partial().Value. Val() N // Partial returns the intermediate result for the reduce phase. Partial() Partial[N] // Reset clears the accumulator for reuse. Reset() } ``` ### Reduce interface (new, for liaison node use) ```go // Reduce combines intermediate results from Map phases into a final value. type Reduce[N Number] interface { // Combine feeds an intermediate result from a Map phase. Combine(Partial[N]) // Val returns the final aggregated value. Val() N // Reset clears the accumulator for reuse. Reset() } ``` ### Factory functions ```go func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error) func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error) ``` ## Concrete Implementations in [function.go](pkg/query/aggregation/function.go) ### SUM - **Map**: Accumulates running sum. `Partial()` returns `{Value: sum}`. - **Reduce**: Sums incoming `Value` fields. `Val()` returns total sum. ### COUNT - **Map**: Increments counter. `Partial()` returns `{Value: count}`. - **Reduce**: Sums incoming `Value` fields (same logic as SUM reduce). `Val()` returns total count. ### MAX - **Map**: Tracks local maximum. `Partial()` returns `{Value: max}`. - **Reduce**: Tracks maximum of incoming `Value` fields. `Val()` returns global max. ### MIN - **Map**: Tracks local minimum. `Partial()` returns `{Value: min}`. - **Reduce**: Tracks minimum of incoming `Value` fields. `Val()` returns global min. ### MEAN (the key case this enables) - **Map**: Tracks sum and count. `Partial()` returns `{Value: sum, Count: count}`. - **Reduce**: Accumulates total sum and total count from incoming partials. `Val()` returns `totalSum / totalCount`. ## Serialization Helpers in [aggregation.go](pkg/query/aggregation/aggregation.go) ```go // PartialToFieldValues converts a Partial to field values for wire transport. func PartialToFieldValues[N Number](p Partial[N]) ([]*modelv1.FieldValue, error) // FieldValuesToPartial converts field values from wire transport to a Partial. func FieldValuesToPartial[N Number](fvs []*modelv1.FieldValue) (Partial[N], error) ``` For non-MEAN functions, this produces a single `FieldValue`. For MEAN, it produces two (sum and count). ## Interface Usage by Context | Context | Interface | Methods used | | -------------------------------------- | --------- | ------------------------------- | | Standalone (single node via `Analyze`) | `Map` | `In()`, `Val()`, `Reset()` | | Distributed - data node | `Map` | `In()`, `Partial()`, `Reset()` | | Distributed - liaison node | `Reduce` | `Combine()`, `Val()`, `Reset()` | | TopN post-processor | `Map` | `In()`, `Val()`, `Reset()` | ## Usage Changes ### Standalone / Data node: [measure_plan_aggregation.go](pkg/query/logical/measure/measure_plan_aggregation.go) The `aggregationPlan` and its iterators (`aggGroupIterator`, `aggAllIterator`) use `Map[N]` instead of `Func[N]`: - **Standalone (single node)**: Call `mapFunc.In(v)` for each raw value, then `mapFunc.Val()` for the final result. This is a drop-in replacement for the old `Func` — same `In`/`Val`/`Reset` contract. - **Distributed data node**: Call `mapFunc.In(v)` for each raw value, then `mapFunc.Partial()` to produce the intermediate result. Convert via `PartialToFieldValues` for the wire response. ### Liaison node: [measure_plan_aggregation.go](pkg/query/logical/measure/measure_plan_aggregation.go) The liaison-side aggregation plan and iterators use `Reduce[N]`: - Receive intermediate results from data nodes - Call `reduceFunc.Combine(partial)` for each intermediate - Call `reduceFunc.Val()` to produce the final result This likely means splitting the current `aggregationPlan` into two variants (map vs reduce) or parameterizing it with a mode flag, since the data node plan and liaison node plan serve different roles. ### Distributed plan: [measure_plan_distributed.go](pkg/query/logical/measure/measure_plan_distributed.go) - Remove the `needCompletePushDownAgg` flag (all aggregation functions can now be pushed down) - Remove the COUNT-to-SUM conversion hack in [measure_analyzer.go:185-186](pkg/query/logical/measure/measure_analyzer.go) - The distributed plan always pushes down the Map phase and the liaison always runs Reduce - The deduplication logic (`deduplicateAggregatedDataPointsWithShard`) remains for replica handling ### TopN post-processor: [topn_post_processor.go](banyand/measure/topn_post_processor.go) Replace `int64Func aggregation.Func[int64]` with `mapFunc aggregation.Map[int64]` on `topNAggregatorItem`. TopN feeds raw values into the aggregation, so it needs Map semantics (not Reduce). For COUNT, `Map.In()` correctly increments a counter rather than summing values: ```go type topNAggregatorItem struct { mapFunc aggregation.Map[int64] // ... other fields unchanged } ``` Usage changes: - `exist.mapFunc.In(item.val)` instead of `exist.int64Func.In(item.val)` (same signature) - `item.mapFunc.Val()` instead of `item.int64Func.Val()` (same signature) ## Execution Flow (distributed) ```mermaid sequenceDiagram participant Client participant Liaison as Liaison_Node participant Data1 as Data_Node_1 participant Data2 as Data_Node_2 Client->>Liaison: QueryRequest with Agg Liaison->>Data1: InternalQueryRequest (push down agg) Liaison->>Data2: InternalQueryRequest (push down agg) Note over Data1: Map phase Data1->>Data1: mapFunc.In(rawValue) per data point Data1->>Data1: mapFunc.Partial() -> intermediate Data1-->>Liaison: InternalQueryResponse with Partial results Note over Data2: Map phase Data2->>Data2: mapFunc.In(rawValue) per data point Data2->>Data2: mapFunc.Partial() -> intermediate Data2-->>Liaison: InternalQueryResponse with Partial results Note over Liaison: Reduce phase Liaison->>Liaison: reduceFunc.Combine(partial1) Liaison->>Liaison: reduceFunc.Combine(partial2) Liaison->>Liaison: reduceFunc.Val() -> final result Liaison-->>Client: QueryResponse ``` related to #13291 ### Use case _No response_ ### Related issues _No response_ ### Are you willing to submit a pull request to implement this on your own? - [ ] Yes I am willing to submit a pull request on my own! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
