Copilot commented on code in PR #932:
URL: 
https://github.com/apache/skywalking-banyandb/pull/932#discussion_r2678403357


##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints 
[]*measurev1.DataPoint, groupByT
        }
        return result, nil
 }
+
+// getCountValue extracts count value from a data point field.
+func getCountValue(dp *measurev1.DataPoint, fieldName string) (int64, bool) {
+       for _, field := range dp.GetFields() {
+               if field.GetName() == fieldName {
+                       fieldValue := field.GetValue()
+                       switch v := fieldValue.GetValue().(type) {
+                       case *modelv1.FieldValue_Int:
+                               return v.Int.GetValue(), true
+                       case *modelv1.FieldValue_Float:
+                               return int64(v.Float.GetValue()), true
+                       default:
+                               return 0, false
+                       }
+               }
+       }
+       return 0, false
+}
+
+// aggregateCountDataPoints aggregates count results by summing values per 
group.

Review Comment:
   The function comment states "aggregates count results by summing values per 
group" but the implementation at lines 596-606 does not actually sum the count 
values. Instead, it validates that duplicate entries have identical counts and 
keeps only one entry per group key. This is inconsistent with the comment. 
Either the comment should be updated to reflect the actual deduplication 
behavior, or the implementation should sum the counts from multiple replicas as 
the comment suggests.



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints 
[]*measurev1.DataPoint, groupByT
        }
        return result, nil
 }
+
+// getCountValue extracts count value from a data point field.
+func getCountValue(dp *measurev1.DataPoint, fieldName string) (int64, bool) {
+       for _, field := range dp.GetFields() {
+               if field.GetName() == fieldName {
+                       fieldValue := field.GetValue()
+                       switch v := fieldValue.GetValue().(type) {
+                       case *modelv1.FieldValue_Int:
+                               return v.Int.GetValue(), true
+                       case *modelv1.FieldValue_Float:
+                               return int64(v.Float.GetValue()), true
+                       default:
+                               return 0, false
+                       }
+               }
+       }
+       return 0, false
+}
+
+// aggregateCountDataPoints aggregates count results by summing values per 
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint, 
error) {
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKey(dp, groupByTagsRefs)
+               if err != nil {
+                       return nil, err
+               }
+               if existingDp, exists := groupMap[key]; exists {
+                       existingCount, existingOk := getCountValue(existingDp, 
fieldName)
+                       currentCount, currentOk := getCountValue(dp, fieldName)
+                       if !existingOk || !currentOk {
+                               return nil, fmt.Errorf("inconsistent data: 
group key %d has data points with missing count field", key)
+                       }
+                       if existingCount != currentCount {
+                               return nil, fmt.Errorf("inconsistent data: 
group key %d has different count values (%d vs %d) from multiple replicas", 
key, existingCount, currentCount)
+                       }
+               }
+               groupMap[key] = dp
+       }
+       result := make([]*measurev1.DataPoint, 0, len(groupMap))
+       for _, dp := range groupMap {
+               if dp == nil {
+                       continue
+               }

Review Comment:
   The nil check at lines 610-612 is unnecessary. The groupMap is populated 
with non-nil DataPoint pointers from the input dataPoints slice (line 606), and 
Go maps cannot store nil as a value without explicit assignment. This check can 
never be true and adds unnecessary complexity to the code.
   ```suggestion
   
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -556,3 +566,64 @@ func deduplicateAggregatedDataPoints(dataPoints 
[]*measurev1.DataPoint, groupByT
        }
        return result, nil
 }
+
+// getCountValue extracts count value from a data point field.
+func getCountValue(dp *measurev1.DataPoint, fieldName string) (int64, bool) {
+       for _, field := range dp.GetFields() {
+               if field.GetName() == fieldName {
+                       fieldValue := field.GetValue()
+                       switch v := fieldValue.GetValue().(type) {
+                       case *modelv1.FieldValue_Int:
+                               return v.Int.GetValue(), true
+                       case *modelv1.FieldValue_Float:
+                               return int64(v.Float.GetValue()), true
+                       default:
+                               return 0, false
+                       }
+               }
+       }
+       return 0, false
+}
+
+// aggregateCountDataPoints aggregates count results by summing values per 
group.
+func aggregateCountDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef, fieldName string) ([]*measurev1.DataPoint, 
error) {
+       groupMap := make(map[uint64]*measurev1.DataPoint)
+       for _, dp := range dataPoints {
+               key, err := formatGroupByKey(dp, groupByTagsRefs)
+               if err != nil {
+                       return nil, err
+               }
+               if existingDp, exists := groupMap[key]; exists {
+                       existingCount, existingOk := getCountValue(existingDp, 
fieldName)
+                       currentCount, currentOk := getCountValue(dp, fieldName)
+                       if !existingOk || !currentOk {
+                               return nil, fmt.Errorf("inconsistent data: 
group key %d has data points with missing count field", key)
+                       }
+                       if existingCount != currentCount {
+                               return nil, fmt.Errorf("inconsistent data: 
group key %d has different count values (%d vs %d) from multiple replicas", 
key, existingCount, currentCount)
+                       }

Review Comment:
   The logic for handling duplicate group keys appears incorrect. When a 
duplicate is found (lines 596-606), the code validates that the counts are 
identical and then unconditionally overwrites the existing entry with line 606 
"groupMap[key] = dp". However, since the counts are validated to be identical, 
this overwrite is redundant and could indicate a misunderstanding of the 
intended behavior. If the counts should be summed (as suggested by the function 
comment), the implementation is incorrect. If deduplication is intended, the 
overwrite is unnecessary and the code should skip to the next iteration after 
validation.
   ```suggestion
                        }
                        // Counts are consistent; skip inserting a duplicate 
for this group key.
                        continue
   ```



##########
pkg/query/logical/measure/measure_plan_distributed.go:
##########
@@ -536,9 +536,19 @@ func (s *pushedDownAggregatedIterator) Close() error {
 }
 
 // deduplicateAggregatedDataPoints removes duplicate aggregated results from 
multiple replicas
-// by keeping only one data point per group. Since replicas hold identical 
data, aggregates
-// for the same group are identical across replicas.
-func deduplicateAggregatedDataPoints(dataPoints []*measurev1.DataPoint, 
groupByTagsRefs [][]*logical.TagRef) ([]*measurev1.DataPoint, error) {
+// and aggregates count results by summing values per group.

Review Comment:
   The documentation comment at line 539 states the function "aggregates count 
results by summing values per group", but the actual implementation in 
aggregateCountDataPoints (lines 596-606) performs deduplication rather than 
summation. When a duplicate group key is found, the code validates that counts 
are identical and returns an error if they differ, but it never sums them. This 
creates confusion about the intended behavior. The comment should be corrected 
to accurately describe the deduplication logic.
   ```suggestion
   // and, for count aggregations, deduplicates per group while ensuring counts 
are consistent across replicas.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to