This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d752f4da65ce028a299b15c6d95b1fac9bb26287
Author: Hongtao Gao <[email protected]>
AuthorDate: Sat May 23 01:27:00 2026 +0000

    feat(e2e+vec/measure): wire vec query into the OAP e2e matrix and harden 
aggregation reduce against passthrough columns
    
    This bundles two halves of the same effort:
    
    1. e2e matrix gains a vec lane
    
    Every OAP e2e suite (Storage, Trace Profiling, Event, Log, Zipkin,
    Cluster, Rover eBPF/Kind) now runs twice: once on the row path and
    once on the vec path. The suites are expressed as a 2-D matrix
    (test × engine) so the (suite, engine) cross-product expands without
    literal duplication. Job names stay 1-for-1 with the previous
    hand-rolled list (row variant unsuffixed, vec variant " Vec"),
    preserving any dashboard/regex matching over the labels.
    
    The vec engine sets
      BANYANDB_MEASURE_VECTORIZED_FLAGS=--measure-vectorized-enabled=true
      BANYANDB_DEPLOYMENT_FILE=.../banyandb-deployment-vec.yaml
    the latter being inert for non-Rover suites (only the Rover Kind
    e2e-banyandb.yaml consumes it via envsubst — its non-Rover siblings
    use docker-compose with BANYANDB_MEASURE_VECTORIZED_FLAGS).
    
    Every docker-compose under test/e2e-v2/cases threads the env knob
    through with the row-default fallback, so the row lane keeps its
    historical behaviour byte-for-byte. A new banyandb-deployment-vec.yaml
    sibling K8s manifest hard-codes --measure-vectorized-enabled=true for
    the Rover Kind harness.
    
    2. Vec measure operators handle TagValue/FieldValue passthrough cells
    
    When a distributed reduce sees partials that travelled through the
    DataPoint egress (or that arrived with row-compatible passthrough
    columns), the group key and partial-field cells land as
    ColumnTypeTagValue / ColumnTypeFieldValue instead of native scalar
    columns. Three concrete failure modes:
    
      * BatchAggregation.combinePartial type-asserted the partial column
        to TypedColumn[int64|float64] and panicked on FieldValue.
        Fix: route through new partialIntValue / partialFloatValue
        decoders that unwrap the FieldValue oneof.
      * appendKeyComponent had no case for TagValue / FieldValue, so
        distinct entity-id keys arriving as passthrough cells collapsed
        to the same (empty) hash. Fix: per-type sub-helpers
        (appendIntKey / appendFloatKey / appendStringKey / appendBytesKey)
        plus appendTagValueKey / appendFieldValueKey that route by oneof
        variant — distinct keys stay distinct.
      * bindAggReduceSpecs picked the first column matching the agg
        output name even when a numeric duplicate existed alongside a
        passthrough field. Fix: prefer the numeric column when present;
        fall back to the FieldValue passthrough when only the
        passthrough is in the partial schema.
    
    Three new unit tests in aggregation_test.go cover (a) numeric-
    preferred binding when a duplicate passthrough exists, (b) FieldValue
    fallback, (c) TagValue group-key collapse guard.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
---
 .github/workflows/e2e.yml                          |  22 +++-
 pkg/query/vectorized/measure/aggregation.go        |   3 +
 pkg/query/vectorized/measure/aggregation_reduce.go |  61 +++++++++--
 pkg/query/vectorized/measure/aggregation_test.go   | 118 +++++++++++++++++++++
 pkg/query/vectorized/measure/groupby.go            |  90 ++++++++++++----
 pkg/query/vectorized/measure/reduce.go             |  16 ++-
 test/e2e-v2/cases/cluster/docker-compose.yml       |   4 +-
 .../e2e-v2/cases/event/banyandb/docker-compose.yml |   2 +-
 .../lifecycle/data-generator/docker-compose.yml    |   2 +-
 test/e2e-v2/cases/lifecycle/docker-compose.yml     |   8 +-
 test/e2e-v2/cases/log/banyandb/docker-compose.yml  |   1 +
 .../profiling/trace/banyandb/docker-compose.yml    |   2 +-
 ...eployment.yaml => banyandb-deployment-vec.yaml} |   1 +
 .../istio/banyandb/banyandb-deployment.yaml        |   1 +
 .../rover/process/istio/banyandb/e2e-banyandb.yaml |   2 +-
 .../cases/storage/banyandb/docker-compose.yml      |   2 +-
 .../cases/zipkin/banyandb/docker-compose.yml       |   1 +
 17 files changed, 294 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 81ba74f61..d205d69e6 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -48,12 +48,20 @@ jobs:
           tag: ${{ github.sha }}
 
   StoragePlugins:
-    name: ${{ matrix.test.name }}
+    name: ${{ matrix.test.name }}${{ matrix.engine.suffix }}
     runs-on: ${{ matrix.test.runs-on || 'ubuntu-latest' }}
     timeout-minutes: 90
     needs: [DockerImage]
     strategy:
       fail-fast: false
+      # Two-dimensional matrix: (suite × engine). The `engine` axis pivots
+      # every suite into a (row, vec) pair without literal duplication. The
+      # row variant keeps the historical CI label (empty suffix); the vec
+      # variant appends " Vec" so existing dashboards/regex over job names
+      # keep matching.
+      #
+      # BANYANDB_DEPLOYMENT_FILE in the vec engine env is consumed only by
+      # the Rover e2e-banyandb.yaml; setting it on other vec runs is inert.
       matrix:
         test:
           - name: Storage
@@ -64,6 +72,8 @@ jobs:
             config: test/e2e-v2/cases/event/banyandb/e2e.yaml
           - name: Log
             config: test/e2e-v2/cases/log/banyandb/e2e.yaml
+          - name: Zipkin
+            config: test/e2e-v2/cases/zipkin/banyandb/e2e.yaml
           - name: Cluster
             config: test/e2e-v2/cases/cluster/e2e.yaml
           - name: Rover eBPF - Istio mesh, Kind
@@ -75,6 +85,12 @@ jobs:
           # TODO: Enable Lifecycle test when it is ready
           # - name: Lifecycle
           #   config: test/e2e-v2/cases/lifecycle/e2e.yaml
+        engine:
+          - suffix: ""
+          - suffix: " Vec"
+            env: |
+              
BANYANDB_MEASURE_VECTORIZED_FLAGS=--measure-vectorized-enabled=true
+              
BANYANDB_DEPLOYMENT_FILE=test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml
     env:
       TAG: ${{ github.sha }}
     steps:
@@ -105,7 +121,9 @@ jobs:
           username: ${{ github.repository_owner }}
           password: ${{ secrets.GITHUB_TOKEN }}
       - name: Set env var
-        run: echo "${{ matrix.test.env }}" >> $GITHUB_ENV
+        run: |
+          echo "${{ matrix.test.env }}" >> $GITHUB_ENV
+          echo "${{ matrix.engine.env }}" >> $GITHUB_ENV
       - name: Generate data for lifecycle
         if: ${{ matrix.test.name == 'Lifecycle' }}
         run: |
diff --git a/pkg/query/vectorized/measure/aggregation.go 
b/pkg/query/vectorized/measure/aggregation.go
index e625ffef8..3244ad300 100644
--- a/pkg/query/vectorized/measure/aggregation.go
+++ b/pkg/query/vectorized/measure/aggregation.go
@@ -657,5 +657,8 @@ func findShardIDIndex(schema *vectorized.BatchSchema) int {
 // FIELD_TYPE_FLOAT → float64 → FieldValue_Float; see
 // measure_plan_aggregation.go and pkg/query/aggregation).
 func aggOutputType(in vectorized.ColumnType, _ AggFunc) vectorized.ColumnType {
+       if in == vectorized.ColumnTypeFieldValue {
+               return vectorized.ColumnTypeInt64
+       }
        return in
 }
diff --git a/pkg/query/vectorized/measure/aggregation_reduce.go 
b/pkg/query/vectorized/measure/aggregation_reduce.go
index 8aa264306..3d14e6cd5 100644
--- a/pkg/query/vectorized/measure/aggregation_reduce.go
+++ b/pkg/query/vectorized/measure/aggregation_reduce.go
@@ -20,6 +20,7 @@ package measure
 import (
        "encoding/binary"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
 )
@@ -122,24 +123,68 @@ func (a *BatchAggregation) combinePartial(b 
*vectorized.RecordBatch, rowIdx int,
                return
        }
        if slot.inputIsFloat {
-               valueCol := col.(*vectorized.TypedColumn[float64])
-               p := aggregation.Partial[float64]{Value: 
valueCol.Data()[rowIdx]}
+               p := aggregation.Partial[float64]{Value: partialFloatValue(col, 
rowIdx)}
                if countIdx >= 0 {
-                       countCol := 
b.Columns[countIdx].(*vectorized.TypedColumn[float64])
-                       p.Count = countCol.Data()[rowIdx]
+                       p.Count = partialFloatValue(b.Columns[countIdx], rowIdx)
                }
                slot.floatReduce.Combine(p)
                return
        }
-       valueCol := col.(*vectorized.TypedColumn[int64])
-       p := aggregation.Partial[int64]{Value: valueCol.Data()[rowIdx]}
+       p := aggregation.Partial[int64]{Value: partialIntValue(col, rowIdx)}
        if countIdx >= 0 {
-               countCol := b.Columns[countIdx].(*vectorized.TypedColumn[int64])
-               p.Count = countCol.Data()[rowIdx]
+               p.Count = partialIntValue(b.Columns[countIdx], rowIdx)
        }
        slot.intReduce.Combine(p)
 }
 
+func partialIntValue(col vectorized.Column, rowIdx int) int64 {
+       switch c := col.(type) {
+       case *vectorized.TypedColumn[int64]:
+               return c.Data()[rowIdx]
+       case *vectorized.TypedColumn[float64]:
+               return int64(c.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               fv := c.Data()[rowIdx]
+               if fv == nil {
+                       return 0
+               }
+               switch value := fv.GetValue().(type) {
+               case *modelv1.FieldValue_Int:
+                       return value.Int.GetValue()
+               case *modelv1.FieldValue_Float:
+                       return int64(value.Float.GetValue())
+               }
+               return 0
+       default:
+               valueCol := col.(*vectorized.TypedColumn[int64])
+               return valueCol.Data()[rowIdx]
+       }
+}
+
+func partialFloatValue(col vectorized.Column, rowIdx int) float64 {
+       switch c := col.(type) {
+       case *vectorized.TypedColumn[float64]:
+               return c.Data()[rowIdx]
+       case *vectorized.TypedColumn[int64]:
+               return float64(c.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               fv := c.Data()[rowIdx]
+               if fv == nil {
+                       return 0
+               }
+               switch value := fv.GetValue().(type) {
+               case *modelv1.FieldValue_Float:
+                       return value.Float.GetValue()
+               case *modelv1.FieldValue_Int:
+                       return float64(value.Int.GetValue())
+               }
+               return 0
+       default:
+               valueCol := col.(*vectorized.TypedColumn[float64])
+               return valueCol.Data()[rowIdx]
+       }
+}
+
 // writeReduce emits the slot's reduced final value to the typed output
 // column. Mirrors aggSlot.write but reads from the Reduce accumulator
 // (whose Val() handles MEAN finalization by dividing Sum by Count).
diff --git a/pkg/query/vectorized/measure/aggregation_test.go 
b/pkg/query/vectorized/measure/aggregation_test.go
index b66d0aafd..141aaba53 100644
--- a/pkg/query/vectorized/measure/aggregation_test.go
+++ b/pkg/query/vectorized/measure/aggregation_test.go
@@ -680,6 +680,124 @@ func 
TestBatchAggregation_AggModeReduce_CombinesPartialsAcrossShards(t *testing.
        }
 }
 
+// TestBatchAggregation_AggModeReduce_BindsNumericDuplicateField asserts the
+// liaison reduce binds the numeric partial column when a schema also carries a
+// passthrough field with the same name. Distributed multi-source schemas may
+// carry row-compatible passthrough fields alongside the AggModeMap partial; 
the
+// reducer must not pick the passthrough column and panic on type assertion.
+func TestBatchAggregation_AggModeReduce_BindsNumericDuplicateField(t 
*testing.T) {
+       s := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "f", Name: "g", Type: 
vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "sum_v", Type: 
vectorized.ColumnTypeFieldValue},
+               {Role: vectorized.RoleField, Name: "sum_v", Type: 
vectorized.ColumnTypeInt64},
+       })
+       specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: 
"sum_v", Func: AggSum}})
+       if bindErr != nil {
+               t.Fatalf("bindAggReduceSpecs: %v", bindErr)
+       }
+       if got := specs[0].InputCol; got != 3 {
+               t.Fatalf("InputCol = %d, want numeric duplicate column 3", got)
+       }
+       op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, 
vectorized.NewMemoryTracker(1<<30), 0)
+       defer op.Close()
+       b := vectorized.NewRecordBatch(s, 2)
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1)
+       b.Columns[1].(*vectorized.TypedColumn[string]).Append("a")
+       b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(nil)
+       b.Columns[3].(*vectorized.TypedColumn[int64]).Append(10)
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(2)
+       b.Columns[1].(*vectorized.TypedColumn[string]).Append("a")
+       b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(nil)
+       b.Columns[3].(*vectorized.TypedColumn[int64]).Append(20)
+       b.Len = 2
+       batches := feedReduce(t, op, b)
+       if len(batches) != 1 || batches[0].Len != 1 {
+               t.Fatalf("expected 1 output batch with 1 row, got %d batches", 
len(batches))
+       }
+       if got := 
batches[0].Columns[1].(*vectorized.TypedColumn[int64]).Data()[0]; got != 30 {
+               t.Fatalf("sum_v = %d, want 30", got)
+       }
+}
+
+// TestBatchAggregation_AggModeReduce_BindsFieldValueFallback covers the hidden
+// criteria path where a data-node iterator wrapper serializes partial rows
+// through DataPoint egress, so the wire frame carries FieldValue passthrough
+// columns instead of native numeric partial columns.
+func TestBatchAggregation_AggModeReduce_BindsFieldValueFallback(t *testing.T) {
+       s := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "f", Name: "g", Type: 
vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "sum_v", Type: 
vectorized.ColumnTypeFieldValue},
+       })
+       specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: 
"sum_v", Func: AggSum}})
+       if bindErr != nil {
+               t.Fatalf("bindAggReduceSpecs: %v", bindErr)
+       }
+       if got := specs[0].InputCol; got != 2 {
+               t.Fatalf("InputCol = %d, want FieldValue fallback column 2", 
got)
+       }
+       op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, 
vectorized.NewMemoryTracker(1<<30), 0)
+       defer op.Close()
+       b := vectorized.NewRecordBatch(s, 2)
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1)
+       b.Columns[1].(*vectorized.TypedColumn[string]).Append("a")
+       
b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value:
 &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 10}}})
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(2)
+       b.Columns[1].(*vectorized.TypedColumn[string]).Append("a")
+       
b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value:
 &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 20}}})
+       b.Len = 2
+       batches := feedReduce(t, op, b)
+       if len(batches) != 1 || batches[0].Len != 1 {
+               t.Fatalf("expected 1 output batch with 1 row, got %d batches", 
len(batches))
+       }
+       if got := 
batches[0].Columns[1].(*vectorized.TypedColumn[int64]).Data()[0]; got != 30 {
+               t.Fatalf("sum_v = %d, want 30", got)
+       }
+}
+
+// TestBatchAggregation_AggModeReduce_TagValueGroupKeyDoesNotCollapse is a
+// focused debug reproducer for distributed hidden-criteria queries: the data
+// node serializes partial aggregate rows through DataPoint egress, so the
+// liaison sees group keys as ColumnTypeTagValue passthrough cells. Distinct
+// TagValue keys must remain distinct during reduce.
+func TestBatchAggregation_AggModeReduce_TagValueGroupKeyDoesNotCollapse(t 
*testing.T) {
+       s := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "f", Name: "entity_id", 
Type: vectorized.ColumnTypeTagValue},
+               {Role: vectorized.RoleField, Name: "sum_v", Type: 
vectorized.ColumnTypeFieldValue},
+       })
+       specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: 
"sum_v", Func: AggSum}})
+       if bindErr != nil {
+               t.Fatalf("bindAggReduceSpecs: %v", bindErr)
+       }
+       op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, 
vectorized.NewMemoryTracker(1<<30), 0)
+       defer op.Close()
+       b := vectorized.NewRecordBatch(s, 2)
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1)
+       
b.Columns[1].(*vectorized.TypedColumn[*modelv1.TagValue]).Append(&modelv1.TagValue{Value:
 &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "e2e-service-consumer"}}})
+       
b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value:
 &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 10}}})
+       b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1)
+       
b.Columns[1].(*vectorized.TypedColumn[*modelv1.TagValue]).Append(&modelv1.TagValue{Value:
 &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "e2e-service-provider"}}})
+       
b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value:
 &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 20}}})
+       b.Len = 2
+       batches := feedReduce(t, op, b)
+       if len(batches) != 1 || batches[0].Len != 2 {
+               t.Fatalf("expected 1 output batch with 2 rows, got %d batches 
and %d rows", len(batches), batches[0].Len)
+       }
+       gotKeys := make(map[string]struct{}, batches[0].Len)
+       keyCol := 
batches[0].Columns[0].(*vectorized.TypedColumn[*modelv1.TagValue])
+       for rowIdx := range batches[0].Len {
+               key := keyCol.Data()[rowIdx].GetStr().GetValue()
+               gotKeys[key] = struct{}{}
+       }
+       for _, wantKey := range []string{"e2e-service-consumer", 
"e2e-service-provider"} {
+               if _, ok := gotKeys[wantKey]; !ok {
+                       t.Fatalf("missing reduced group key %q in %v", wantKey, 
gotKeys)
+               }
+       }
+}
+
 // TestBatchAggregation_AggModeReduce_DedupsSameShardSameGroup asserts replica
 // duplicates — same shard_id + same group_key — collapse to a single
 // contribution. Two identical (shard=1, g=a, sum_v=10) rows yield sum=10.
diff --git a/pkg/query/vectorized/measure/groupby.go 
b/pkg/query/vectorized/measure/groupby.go
index 7904da051..eff23216e 100644
--- a/pkg/query/vectorized/measure/groupby.go
+++ b/pkg/query/vectorized/measure/groupby.go
@@ -251,35 +251,85 @@ func (g *BatchGroupBy) encodeKey(dst []byte, b 
*vectorized.RecordBatch, rowIdx i
 //   - float64:  8 little-endian bytes of math.Float64bits, with -0.0 → +0.0.
 //   - string:   4-byte little-endian length prefix + raw bytes.
 //   - bytes:    4-byte little-endian length prefix + raw bytes.
+//   - TagValue/FieldValue passthrough: same encoding as the scalar payload 
type.
 //
 // This helper is shared by BatchGroupBy.encodeKey and 
BatchAggregation.computeKey
 // so the two operators agree on key equivalence (Copilot G3 review issues 
1+2).
 func appendKeyComponent(dst []byte, col vectorized.Column, rowIdx int) []byte {
        switch c := col.(type) {
        case *vectorized.TypedColumn[int64]:
-               var b [8]byte
-               binary.LittleEndian.PutUint64(b[:], uint64(c.Data()[rowIdx]))
-               return append(dst, b[:]...)
+               return appendIntKey(dst, c.Data()[rowIdx])
        case *vectorized.TypedColumn[float64]:
-               v := c.Data()[rowIdx]
-               if v == 0 {
-                       v = 0 // canonicalise -0.0 → +0.0 so they hash 
identically
-               }
-               var b [8]byte
-               binary.LittleEndian.PutUint64(b[:], math.Float64bits(v))
-               return append(dst, b[:]...)
+               return appendFloatKey(dst, c.Data()[rowIdx])
        case *vectorized.TypedColumn[string]:
-               s := c.Data()[rowIdx]
-               var lb [4]byte
-               binary.LittleEndian.PutUint32(lb[:], uint32(len(s)))
-               dst = append(dst, lb[:]...)
-               return append(dst, s...)
+               return appendStringKey(dst, c.Data()[rowIdx])
        case *vectorized.TypedColumn[[]byte]:
-               bs := c.Data()[rowIdx]
-               var lb [4]byte
-               binary.LittleEndian.PutUint32(lb[:], uint32(len(bs)))
-               dst = append(dst, lb[:]...)
-               return append(dst, bs...)
+               return appendBytesKey(dst, c.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.TagValue]:
+               return appendTagValueKey(dst, c.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               return appendFieldValueKey(dst, c.Data()[rowIdx])
+       }
+       return dst
+}
+
+func appendIntKey(dst []byte, value int64) []byte {
+       var b [8]byte
+       binary.LittleEndian.PutUint64(b[:], uint64(value))
+       return append(dst, b[:]...)
+}
+
+func appendFloatKey(dst []byte, value float64) []byte {
+       if value == 0 {
+               value = 0 // canonicalise -0.0 → +0.0 so they hash identically
+       }
+       var b [8]byte
+       binary.LittleEndian.PutUint64(b[:], math.Float64bits(value))
+       return append(dst, b[:]...)
+}
+
+func appendStringKey(dst []byte, value string) []byte {
+       var lb [4]byte
+       binary.LittleEndian.PutUint32(lb[:], uint32(len(value)))
+       dst = append(dst, lb[:]...)
+       return append(dst, value...)
+}
+
+func appendBytesKey(dst []byte, value []byte) []byte {
+       var lb [4]byte
+       binary.LittleEndian.PutUint32(lb[:], uint32(len(value)))
+       dst = append(dst, lb[:]...)
+       return append(dst, value...)
+}
+
+func appendTagValueKey(dst []byte, value *modelv1.TagValue) []byte {
+       if value == nil {
+               return dst
+       }
+       switch payload := value.GetValue().(type) {
+       case *modelv1.TagValue_Int:
+               return appendIntKey(dst, payload.Int.GetValue())
+       case *modelv1.TagValue_Str:
+               return appendStringKey(dst, payload.Str.GetValue())
+       case *modelv1.TagValue_BinaryData:
+               return appendBytesKey(dst, payload.BinaryData)
+       }
+       return dst
+}
+
+func appendFieldValueKey(dst []byte, value *modelv1.FieldValue) []byte {
+       if value == nil {
+               return dst
+       }
+       switch payload := value.GetValue().(type) {
+       case *modelv1.FieldValue_Int:
+               return appendIntKey(dst, payload.Int.GetValue())
+       case *modelv1.FieldValue_Float:
+               return appendFloatKey(dst, payload.Float.GetValue())
+       case *modelv1.FieldValue_Str:
+               return appendStringKey(dst, payload.Str.GetValue())
+       case *modelv1.FieldValue_BinaryData:
+               return appendBytesKey(dst, payload.BinaryData)
        }
        return dst
 }
diff --git a/pkg/query/vectorized/measure/reduce.go 
b/pkg/query/vectorized/measure/reduce.go
index 1a8c7d7e8..4a5433ea1 100644
--- a/pkg/query/vectorized/measure/reduce.go
+++ b/pkg/query/vectorized/measure/reduce.go
@@ -193,11 +193,21 @@ func bindAggReduceSpecs(schema *vectorized.BatchSchema, 
aggSpecs []AggReduceSpec
        out := make([]AggSpec, 0, len(aggSpecs))
        for _, ars := range aggSpecs {
                valueIdx := -1
+               fieldValueIdx := -1
                for i, def := range schema.Columns {
-                       if def.Role == vectorized.RoleField && def.Name == 
ars.OutputName {
+                       if def.Role != vectorized.RoleField || def.Name != 
ars.OutputName {
+                               continue
+                       }
+                       if isAggReduceValueType(def.Type) {
                                valueIdx = i
                                break
                        }
+                       if def.Type == vectorized.ColumnTypeFieldValue && 
fieldValueIdx < 0 {
+                               fieldValueIdx = i
+                       }
+               }
+               if valueIdx < 0 {
+                       valueIdx = fieldValueIdx
                }
                if valueIdx < 0 {
                        return nil, fmt.Errorf("agg output column %q not 
present in partial schema", ars.OutputName)
@@ -211,6 +221,10 @@ func bindAggReduceSpecs(schema *vectorized.BatchSchema, 
aggSpecs []AggReduceSpec
        return out, nil
 }
 
+func isAggReduceValueType(columnType vectorized.ColumnType) bool {
+       return columnType == vectorized.ColumnTypeInt64 || columnType == 
vectorized.ColumnTypeFloat64
+}
+
 // schemaCompatible asserts that two BatchSchemas describe the same column
 // layout — every per-column Name, Role and Type matches. The hard-cutover
 // model forbids schema drift across partials of the same query, so any
diff --git a/test/e2e-v2/cases/cluster/docker-compose.yml 
b/test/e2e-v2/cases/cluster/docker-compose.yml
index 0e6585b84..e4e899a24 100644
--- a/test/e2e-v2/cases/cluster/docker-compose.yml
+++ b/test/e2e-v2/cases/cluster/docker-compose.yml
@@ -20,7 +20,7 @@ services:
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: data
-    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml
+    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./nodes.yaml:/etc/banyandb/nodes.yaml
     networks:
@@ -30,7 +30,7 @@ services:
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: liaison
-    command: liaison --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml
+    command: liaison --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./nodes.yaml:/etc/banyandb/nodes.yaml
     networks:
diff --git a/test/e2e-v2/cases/event/banyandb/docker-compose.yml 
b/test/e2e-v2/cases/event/banyandb/docker-compose.yml
index 62c9575bf..535169b93 100644
--- a/test/e2e-v2/cases/event/banyandb/docker-compose.yml
+++ b/test/e2e-v2/cases/event/banyandb/docker-compose.yml
@@ -18,7 +18,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone
+    command: standalone 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml 
b/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml
index 98f1078ac..d480d75b0 100644
--- a/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml
+++ b/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml
@@ -21,7 +21,7 @@ services:
     ports:
       - 17912:17912
       - 17913:17913
-    command: standalone --measure-metadata-cache-wait-duration 1m 
--stream-metadata-cache-wait-duration 1m
+    command: standalone --measure-metadata-cache-wait-duration 1m 
--stream-metadata-cache-wait-duration 1m 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/lifecycle/docker-compose.yml 
b/test/e2e-v2/cases/lifecycle/docker-compose.yml
index a5fcaee1e..77991c147 100644
--- a/test/e2e-v2/cases/lifecycle/docker-compose.yml
+++ b/test/e2e-v2/cases/lifecycle/docker-compose.yml
@@ -19,7 +19,7 @@ services:
       file: ../../script/docker-compose/base-compose.yml
       service: data
     hostname: data-hot1
-    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=hot
+    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=hot 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./data-generator/tmp/metadata:/tmp/measure/data/sw_metadata
       - ./nodes.yaml:/etc/banyandb/nodes.yaml
@@ -31,7 +31,7 @@ services:
       file: ../../script/docker-compose/base-compose.yml
       service: data
     hostname: data-warm1
-    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=warm
+    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=warm 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./nodes.yaml:/etc/banyandb/nodes.yaml
     networks:
@@ -42,7 +42,7 @@ services:
       file: ../../script/docker-compose/base-compose.yml
       service: data
     hostname: data-cold1
-    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=cold
+    command: data --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=cold 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./data-generator/tmp/measure:/tmp/measure
       - ./data-generator/tmp/stream:/tmp/stream
@@ -56,7 +56,7 @@ services:
     extends:
       file: ../../script/docker-compose/base-compose.yml
       service: liaison
-    command: liaison --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --data-node-selector 
type=hot
+    command: liaison --node-discovery-mode=file 
--node-discovery-file-path=/etc/banyandb/nodes.yaml --data-node-selector 
type=hot 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     volumes:
       - ./nodes.yaml:/etc/banyandb/nodes.yaml
     networks:
diff --git a/test/e2e-v2/cases/log/banyandb/docker-compose.yml 
b/test/e2e-v2/cases/log/banyandb/docker-compose.yml
index 79eb59a3c..faab41b86 100644
--- a/test/e2e-v2/cases/log/banyandb/docker-compose.yml
+++ b/test/e2e-v2/cases/log/banyandb/docker-compose.yml
@@ -20,6 +20,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
+    command: standalone 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml 
b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml
index e09b185f8..7cf389453 100644
--- a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml
+++ b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml
@@ -20,7 +20,7 @@ services:
     extends:
       file: ../../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone
+    command: standalone 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 
diff --git 
a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml 
b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml
similarity index 97%
copy from 
test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml
copy to 
test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml
index 0c366a8fe..3176c00d0 100644
--- a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml
+++ 
b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml
@@ -36,6 +36,7 @@ spec:
           imagePullPolicy: IfNotPresent
           args:
             - standalone
+            - --measure-vectorized-enabled=true
           ports:
             - name: grpc
               containerPort: 17912
diff --git 
a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml 
b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml
index 0c366a8fe..86bbfb038 100644
--- a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml
+++ b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml
@@ -36,6 +36,7 @@ spec:
           imagePullPolicy: IfNotPresent
           args:
             - standalone
+            - --measure-vectorized-enabled=false
           ports:
             - name: grpc
               containerPort: 17912
diff --git a/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml 
b/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml
index 200167081..c9816645a 100644
--- a/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml
+++ b/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml
@@ -48,7 +48,7 @@ setup:
     - name: Load BanyanDB image into kind
       command: kind load docker-image 
apache/skywalking-banyandb:${TAG}-testing --name kind
     - name: Deploy BanyanDB
-      command: envsubst < 
test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml | 
kubectl apply -f -
+      command: envsubst < 
${BANYANDB_DEPLOYMENT_FILE:-test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml}
 | kubectl apply -f -
     - name: Wait for BanyanDB
       command: kubectl rollout status deployment/banyandb -n istio-system 
--timeout=120s
     - name: Install SkyWalking
diff --git a/test/e2e-v2/cases/storage/banyandb/docker-compose.yml 
b/test/e2e-v2/cases/storage/banyandb/docker-compose.yml
index 8d12c8e83..a9942588c 100644
--- a/test/e2e-v2/cases/storage/banyandb/docker-compose.yml
+++ b/test/e2e-v2/cases/storage/banyandb/docker-compose.yml
@@ -20,7 +20,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
-    command: standalone
+    command: standalone 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 
diff --git a/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml 
b/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml
index 67ddc3fee..e09297e9d 100644
--- a/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml
+++ b/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml
@@ -20,6 +20,7 @@ services:
     extends:
       file: ../../../script/docker-compose/base-compose.yml
       service: banyandb
+    command: standalone 
${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false}
     networks:
       - e2e
 

Reply via email to