hanahmily opened a new issue, #13694:
URL: https://github.com/apache/skywalking/issues/13694

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/skywalking/issues?q=is%3Aissue) and found no 
similar feature requirement.
   
   
   ### Description
   
   ## Current Architecture
   
   The `Func[N]` interface (`In`, `Val`, `Reset`) is used as a monolithic 
accumulator everywhere:
   
   - Data node aggregation plans (via `Analyze`)
   - Liaison node aggregation plans (via `DistributedAnalyze`)
   - TopN post-processor
   
   The distributed push-down is handled with hacks:
   
   - `needCompletePushDownAgg` only supports MAX/MIN/SUM/COUNT (not MEAN)
   - COUNT is silently converted to SUM at the liaison 
([measure_analyzer.go:185-186](pkg/query/logical/measure/measure_analyzer.go))
   - The liaison just deduplicates push-down results rather than properly 
reducing
   
   ## New Interface Design
   
   ### Intermediate type in 
[aggregation.go](pkg/query/aggregation/aggregation.go)
   
   ```go
   // Partial represents the intermediate result of a Map phase.
   // For most functions, only Value is meaningful.
   // For MEAN, both Value (sum) and Count are used.
   type Partial[N Number] struct {
       Value N
       Count N
   }
   ```
   
   ### Map interface (replaces Func, used by data node, standalone, and TopN)
   
   ```go
   // Map accumulates raw values and produces aggregation results.
   // It serves as the local accumulator for raw data points.
   type Map[N Number] interface {
       // In feeds a raw value into the accumulator.
       In(N)
       // Val returns the locally finalized aggregation result.
       // For MEAN, this computes sum/count. For others, same as 
Partial().Value.
       Val() N
       // Partial returns the intermediate result for the reduce phase.
       Partial() Partial[N]
       // Reset clears the accumulator for reuse.
       Reset()
   }
   ```
   
   ### Reduce interface (new, for liaison node use)
   
   ```go
   // Reduce combines intermediate results from Map phases into a final value.
   type Reduce[N Number] interface {
       // Combine feeds an intermediate result from a Map phase.
       Combine(Partial[N])
       // Val returns the final aggregated value.
       Val() N
       // Reset clears the accumulator for reuse.
       Reset()
   }
   ```
   
   ### Factory functions
   
   ```go
   func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error)
   func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error)
   ```
   
   ## Concrete Implementations in 
[function.go](pkg/query/aggregation/function.go)
   
   ### SUM
   
   - **Map**: Accumulates running sum. `Partial()` returns `{Value: sum}`.
   - **Reduce**: Sums incoming `Value` fields. `Val()` returns total sum.
   
   ### COUNT
   
   - **Map**: Increments counter. `Partial()` returns `{Value: count}`.
   - **Reduce**: Sums incoming `Value` fields (same logic as SUM reduce). 
`Val()` returns total count.
   
   ### MAX
   
   - **Map**: Tracks local maximum. `Partial()` returns `{Value: max}`.
   - **Reduce**: Tracks maximum of incoming `Value` fields. `Val()` returns 
global max.
   
   ### MIN
   
   - **Map**: Tracks local minimum. `Partial()` returns `{Value: min}`.
   - **Reduce**: Tracks minimum of incoming `Value` fields. `Val()` returns 
global min.
   
   ### MEAN (the key case this enables)
   
   - **Map**: Tracks sum and count. `Partial()` returns `{Value: sum, Count: 
count}`.
   - **Reduce**: Accumulates total sum and total count from incoming partials. 
`Val()` returns `totalSum / totalCount`.
   
   ## Serialization Helpers in 
[aggregation.go](pkg/query/aggregation/aggregation.go)
   
   ```go
   // PartialToFieldValues converts a Partial to field values for wire 
transport.
   func PartialToFieldValues[N Number](p Partial[N]) ([]*modelv1.FieldValue, 
error)
   
   // FieldValuesToPartial converts field values from wire transport to a 
Partial.
   func FieldValuesToPartial[N Number](fvs []*modelv1.FieldValue) (Partial[N], 
error)
   ```
   
   For non-MEAN functions, this produces a single `FieldValue`. For MEAN, it 
produces two (sum and count).
   
   ## Interface Usage by Context
   
   
   | Context                                | Interface | Methods used          
          |
   | -------------------------------------- | --------- | 
------------------------------- |
   | Standalone (single node via `Analyze`) | `Map`     | `In()`, `Val()`, 
`Reset()`      |
   | Distributed - data node                | `Map`     | `In()`, `Partial()`, 
`Reset()`  |
   | Distributed - liaison node             | `Reduce`  | `Combine()`, `Val()`, 
`Reset()` |
   | TopN post-processor                    | `Map`     | `In()`, `Val()`, 
`Reset()`      |
   
   
   ## Usage Changes
   
   ### Standalone / Data node: 
[measure_plan_aggregation.go](pkg/query/logical/measure/measure_plan_aggregation.go)
   
   The `aggregationPlan` and its iterators (`aggGroupIterator`, 
`aggAllIterator`) use `Map[N]` instead of `Func[N]`:
   
   - **Standalone (single node)**: Call `mapFunc.In(v)` for each raw value, 
then `mapFunc.Val()` for the final result. This is a drop-in replacement for 
the old `Func` — same `In`/`Val`/`Reset` contract.
   - **Distributed data node**: Call `mapFunc.In(v)` for each raw value, then 
`mapFunc.Partial()` to produce the intermediate result. Convert via 
`PartialToFieldValues` for the wire response.
   
   ### Liaison node: 
[measure_plan_aggregation.go](pkg/query/logical/measure/measure_plan_aggregation.go)
   
   The liaison-side aggregation plan and iterators use `Reduce[N]`:
   
   - Receive intermediate results from data nodes
   - Call `reduceFunc.Combine(partial)` for each intermediate
   - Call `reduceFunc.Val()` to produce the final result
   
   This likely means splitting the current `aggregationPlan` into two variants 
(map vs reduce) or parameterizing it with a mode flag, since the data node plan 
and liaison node plan serve different roles.
   
   ### Distributed plan: 
[measure_plan_distributed.go](pkg/query/logical/measure/measure_plan_distributed.go)
   
   - Remove the `needCompletePushDownAgg` flag (all aggregation functions can 
now be pushed down)
   - Remove the COUNT-to-SUM conversion hack in 
[measure_analyzer.go:185-186](pkg/query/logical/measure/measure_analyzer.go)
   - The distributed plan always pushes down the Map phase and the liaison 
always runs Reduce
   - The deduplication logic (`deduplicateAggregatedDataPointsWithShard`) 
remains for replica handling
   
   ### TopN post-processor: 
[topn_post_processor.go](banyand/measure/topn_post_processor.go)
   
   Replace `int64Func aggregation.Func[int64]` with `mapFunc 
aggregation.Map[int64]` on `topNAggregatorItem`. TopN feeds raw values into the 
aggregation, so it needs Map semantics (not Reduce). For COUNT, `Map.In()` 
correctly increments a counter rather than summing values:
   
   ```go
   type topNAggregatorItem struct {
       mapFunc aggregation.Map[int64]
       // ... other fields unchanged
   }
   ```
   
   Usage changes:
   
   - `exist.mapFunc.In(item.val)` instead of `exist.int64Func.In(item.val)` 
(same signature)
   - `item.mapFunc.Val()` instead of `item.int64Func.Val()` (same signature)
   
   ## Execution Flow (distributed)
   
   ```mermaid
   sequenceDiagram
       participant Client
       participant Liaison as Liaison_Node
       participant Data1 as Data_Node_1
       participant Data2 as Data_Node_2
   
       Client->>Liaison: QueryRequest with Agg
       Liaison->>Data1: InternalQueryRequest (push down agg)
       Liaison->>Data2: InternalQueryRequest (push down agg)
   
       Note over Data1: Map phase
       Data1->>Data1: mapFunc.In(rawValue) per data point
       Data1->>Data1: mapFunc.Partial() -> intermediate
       Data1-->>Liaison: InternalQueryResponse with Partial results
   
       Note over Data2: Map phase
       Data2->>Data2: mapFunc.In(rawValue) per data point
       Data2->>Data2: mapFunc.Partial() -> intermediate
       Data2-->>Liaison: InternalQueryResponse with Partial results
   
       Note over Liaison: Reduce phase
       Liaison->>Liaison: reduceFunc.Combine(partial1)
       Liaison->>Liaison: reduceFunc.Combine(partial2)
       Liaison->>Liaison: reduceFunc.Val() -> final result
       Liaison-->>Client: QueryResponse
   ```
   
   
   related to #13291 
   
   ### Use case
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a pull request to implement this on your own?
   
   - [ ] Yes I am willing to submit a pull request on my own!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 
[email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to