This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d752f4da65ce028a299b15c6d95b1fac9bb26287 Author: Hongtao Gao <[email protected]> AuthorDate: Sat May 23 01:27:00 2026 +0000 feat(e2e+vec/measure): wire vec query into the OAP e2e matrix and harden aggregation reduce against passthrough columns This bundles two halves of the same effort: 1. e2e matrix gains a vec lane Every OAP e2e suite (Storage, Trace Profiling, Event, Log, Zipkin, Cluster, Rover eBPF/Kind) now runs twice: once on the row path and once on the vec path. The suites are expressed as a 2-D matrix (test × engine) so the (suite, engine) cross-product expands without literal duplication. Job names stay 1-for-1 with the previous hand-rolled list (row variant unsuffixed, vec variant " Vec"), preserving any dashboard/regex matching over the labels. The vec engine sets BANYANDB_MEASURE_VECTORIZED_FLAGS=--measure-vectorized-enabled=true BANYANDB_DEPLOYMENT_FILE=.../banyandb-deployment-vec.yaml the latter being inert for non-Rover suites (only the Rover Kind e2e-banyandb.yaml consumes it via envsubst — its non-Rover siblings use docker-compose with BANYANDB_MEASURE_VECTORIZED_FLAGS). Every docker-compose under test/e2e-v2/cases threads the env knob through with the row-default fallback, so the row lane keeps its historical behaviour byte-for-byte. A new banyandb-deployment-vec.yaml sibling K8s manifest hard-codes --measure-vectorized-enabled=true for the Rover Kind harness. 2. Vec measure operators handle TagValue/FieldValue passthrough cells When a distributed reduce sees partials that travelled through the DataPoint egress (or that arrived with row-compatible passthrough columns), the group key and partial-field cells land as ColumnTypeTagValue / ColumnTypeFieldValue instead of native scalar columns. Three concrete failure modes: * BatchAggregation.combinePartial type-asserted the partial column to TypedColumn[int64|float64] and panicked on FieldValue. Fix: route through new partialIntValue / partialFloatValue decoders that unwrap the FieldValue oneof. * appendKeyComponent had no case for TagValue / FieldValue, so distinct entity-id keys arriving as passthrough cells collapsed to the same (empty) hash. Fix: per-type sub-helpers (appendIntKey / appendFloatKey / appendStringKey / appendBytesKey) plus appendTagValueKey / appendFieldValueKey that route by oneof variant — distinct keys stay distinct. * bindAggReduceSpecs picked the first column matching the agg output name even when a numeric duplicate existed alongside a passthrough field. Fix: prefer the numeric column when present; fall back to the FieldValue passthrough when only the passthrough is in the partial schema. Three new unit tests in aggregation_test.go cover (a) numeric- preferred binding when a duplicate passthrough exists, (b) FieldValue fallback, (c) TagValue group-key collapse guard. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> --- .github/workflows/e2e.yml | 22 +++- pkg/query/vectorized/measure/aggregation.go | 3 + pkg/query/vectorized/measure/aggregation_reduce.go | 61 +++++++++-- pkg/query/vectorized/measure/aggregation_test.go | 118 +++++++++++++++++++++ pkg/query/vectorized/measure/groupby.go | 90 ++++++++++++---- pkg/query/vectorized/measure/reduce.go | 16 ++- test/e2e-v2/cases/cluster/docker-compose.yml | 4 +- .../e2e-v2/cases/event/banyandb/docker-compose.yml | 2 +- .../lifecycle/data-generator/docker-compose.yml | 2 +- test/e2e-v2/cases/lifecycle/docker-compose.yml | 8 +- test/e2e-v2/cases/log/banyandb/docker-compose.yml | 1 + .../profiling/trace/banyandb/docker-compose.yml | 2 +- ...eployment.yaml => banyandb-deployment-vec.yaml} | 1 + .../istio/banyandb/banyandb-deployment.yaml | 1 + .../rover/process/istio/banyandb/e2e-banyandb.yaml | 2 +- .../cases/storage/banyandb/docker-compose.yml | 2 +- .../cases/zipkin/banyandb/docker-compose.yml | 1 + 17 files changed, 294 insertions(+), 42 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 81ba74f61..d205d69e6 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -48,12 +48,20 @@ jobs: tag: ${{ github.sha }} StoragePlugins: - name: ${{ matrix.test.name }} + name: ${{ matrix.test.name }}${{ matrix.engine.suffix }} runs-on: ${{ matrix.test.runs-on || 'ubuntu-latest' }} timeout-minutes: 90 needs: [DockerImage] strategy: fail-fast: false + # Two-dimensional matrix: (suite × engine). The `engine` axis pivots + # every suite into a (row, vec) pair without literal duplication. The + # row variant keeps the historical CI label (empty suffix); the vec + # variant appends " Vec" so existing dashboards/regex over job names + # keep matching. + # + # BANYANDB_DEPLOYMENT_FILE in the vec engine env is consumed only by + # the Rover e2e-banyandb.yaml; setting it on other vec runs is inert. matrix: test: - name: Storage @@ -64,6 +72,8 @@ jobs: config: test/e2e-v2/cases/event/banyandb/e2e.yaml - name: Log config: test/e2e-v2/cases/log/banyandb/e2e.yaml + - name: Zipkin + config: test/e2e-v2/cases/zipkin/banyandb/e2e.yaml - name: Cluster config: test/e2e-v2/cases/cluster/e2e.yaml - name: Rover eBPF - Istio mesh, Kind @@ -75,6 +85,12 @@ jobs: # TODO: Enable Lifecycle test when it is ready # - name: Lifecycle # config: test/e2e-v2/cases/lifecycle/e2e.yaml + engine: + - suffix: "" + - suffix: " Vec" + env: | + BANYANDB_MEASURE_VECTORIZED_FLAGS=--measure-vectorized-enabled=true + BANYANDB_DEPLOYMENT_FILE=test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml env: TAG: ${{ github.sha }} steps: @@ -105,7 +121,9 @@ jobs: username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - name: Set env var - run: echo "${{ matrix.test.env }}" >> $GITHUB_ENV + run: | + echo "${{ matrix.test.env }}" >> $GITHUB_ENV + echo "${{ matrix.engine.env }}" >> $GITHUB_ENV - name: Generate data for lifecycle if: ${{ matrix.test.name == 'Lifecycle' }} run: | diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go index e625ffef8..3244ad300 100644 --- a/pkg/query/vectorized/measure/aggregation.go +++ b/pkg/query/vectorized/measure/aggregation.go @@ -657,5 +657,8 @@ func findShardIDIndex(schema *vectorized.BatchSchema) int { // FIELD_TYPE_FLOAT → float64 → FieldValue_Float; see // measure_plan_aggregation.go and pkg/query/aggregation). func aggOutputType(in vectorized.ColumnType, _ AggFunc) vectorized.ColumnType { + if in == vectorized.ColumnTypeFieldValue { + return vectorized.ColumnTypeInt64 + } return in } diff --git a/pkg/query/vectorized/measure/aggregation_reduce.go b/pkg/query/vectorized/measure/aggregation_reduce.go index 8aa264306..3d14e6cd5 100644 --- a/pkg/query/vectorized/measure/aggregation_reduce.go +++ b/pkg/query/vectorized/measure/aggregation_reduce.go @@ -20,6 +20,7 @@ package measure import ( "encoding/binary" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/aggregation" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" ) @@ -122,24 +123,68 @@ func (a *BatchAggregation) combinePartial(b *vectorized.RecordBatch, rowIdx int, return } if slot.inputIsFloat { - valueCol := col.(*vectorized.TypedColumn[float64]) - p := aggregation.Partial[float64]{Value: valueCol.Data()[rowIdx]} + p := aggregation.Partial[float64]{Value: partialFloatValue(col, rowIdx)} if countIdx >= 0 { - countCol := b.Columns[countIdx].(*vectorized.TypedColumn[float64]) - p.Count = countCol.Data()[rowIdx] + p.Count = partialFloatValue(b.Columns[countIdx], rowIdx) } slot.floatReduce.Combine(p) return } - valueCol := col.(*vectorized.TypedColumn[int64]) - p := aggregation.Partial[int64]{Value: valueCol.Data()[rowIdx]} + p := aggregation.Partial[int64]{Value: partialIntValue(col, rowIdx)} if countIdx >= 0 { - countCol := b.Columns[countIdx].(*vectorized.TypedColumn[int64]) - p.Count = countCol.Data()[rowIdx] + p.Count = partialIntValue(b.Columns[countIdx], rowIdx) } slot.intReduce.Combine(p) } +func partialIntValue(col vectorized.Column, rowIdx int) int64 { + switch c := col.(type) { + case *vectorized.TypedColumn[int64]: + return c.Data()[rowIdx] + case *vectorized.TypedColumn[float64]: + return int64(c.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.FieldValue]: + fv := c.Data()[rowIdx] + if fv == nil { + return 0 + } + switch value := fv.GetValue().(type) { + case *modelv1.FieldValue_Int: + return value.Int.GetValue() + case *modelv1.FieldValue_Float: + return int64(value.Float.GetValue()) + } + return 0 + default: + valueCol := col.(*vectorized.TypedColumn[int64]) + return valueCol.Data()[rowIdx] + } +} + +func partialFloatValue(col vectorized.Column, rowIdx int) float64 { + switch c := col.(type) { + case *vectorized.TypedColumn[float64]: + return c.Data()[rowIdx] + case *vectorized.TypedColumn[int64]: + return float64(c.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.FieldValue]: + fv := c.Data()[rowIdx] + if fv == nil { + return 0 + } + switch value := fv.GetValue().(type) { + case *modelv1.FieldValue_Float: + return value.Float.GetValue() + case *modelv1.FieldValue_Int: + return float64(value.Int.GetValue()) + } + return 0 + default: + valueCol := col.(*vectorized.TypedColumn[float64]) + return valueCol.Data()[rowIdx] + } +} + // writeReduce emits the slot's reduced final value to the typed output // column. Mirrors aggSlot.write but reads from the Reduce accumulator // (whose Val() handles MEAN finalization by dividing Sum by Count). diff --git a/pkg/query/vectorized/measure/aggregation_test.go b/pkg/query/vectorized/measure/aggregation_test.go index b66d0aafd..141aaba53 100644 --- a/pkg/query/vectorized/measure/aggregation_test.go +++ b/pkg/query/vectorized/measure/aggregation_test.go @@ -680,6 +680,124 @@ func TestBatchAggregation_AggModeReduce_CombinesPartialsAcrossShards(t *testing. } } +// TestBatchAggregation_AggModeReduce_BindsNumericDuplicateField asserts the +// liaison reduce binds the numeric partial column when a schema also carries a +// passthrough field with the same name. Distributed multi-source schemas may +// carry row-compatible passthrough fields alongside the AggModeMap partial; the +// reducer must not pick the passthrough column and panic on type assertion. +func TestBatchAggregation_AggModeReduce_BindsNumericDuplicateField(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "f", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "sum_v", Type: vectorized.ColumnTypeFieldValue}, + {Role: vectorized.RoleField, Name: "sum_v", Type: vectorized.ColumnTypeInt64}, + }) + specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: "sum_v", Func: AggSum}}) + if bindErr != nil { + t.Fatalf("bindAggReduceSpecs: %v", bindErr) + } + if got := specs[0].InputCol; got != 3 { + t.Fatalf("InputCol = %d, want numeric duplicate column 3", got) + } + op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, vectorized.NewMemoryTracker(1<<30), 0) + defer op.Close() + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[1].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(nil) + b.Columns[3].(*vectorized.TypedColumn[int64]).Append(10) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(2) + b.Columns[1].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(nil) + b.Columns[3].(*vectorized.TypedColumn[int64]).Append(20) + b.Len = 2 + batches := feedReduce(t, op, b) + if len(batches) != 1 || batches[0].Len != 1 { + t.Fatalf("expected 1 output batch with 1 row, got %d batches", len(batches)) + } + if got := batches[0].Columns[1].(*vectorized.TypedColumn[int64]).Data()[0]; got != 30 { + t.Fatalf("sum_v = %d, want 30", got) + } +} + +// TestBatchAggregation_AggModeReduce_BindsFieldValueFallback covers the hidden +// criteria path where a data-node iterator wrapper serializes partial rows +// through DataPoint egress, so the wire frame carries FieldValue passthrough +// columns instead of native numeric partial columns. +func TestBatchAggregation_AggModeReduce_BindsFieldValueFallback(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "f", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "sum_v", Type: vectorized.ColumnTypeFieldValue}, + }) + specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: "sum_v", Func: AggSum}}) + if bindErr != nil { + t.Fatalf("bindAggReduceSpecs: %v", bindErr) + } + if got := specs[0].InputCol; got != 2 { + t.Fatalf("InputCol = %d, want FieldValue fallback column 2", got) + } + op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, vectorized.NewMemoryTracker(1<<30), 0) + defer op.Close() + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[1].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 10}}}) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(2) + b.Columns[1].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 20}}}) + b.Len = 2 + batches := feedReduce(t, op, b) + if len(batches) != 1 || batches[0].Len != 1 { + t.Fatalf("expected 1 output batch with 1 row, got %d batches", len(batches)) + } + if got := batches[0].Columns[1].(*vectorized.TypedColumn[int64]).Data()[0]; got != 30 { + t.Fatalf("sum_v = %d, want 30", got) + } +} + +// TestBatchAggregation_AggModeReduce_TagValueGroupKeyDoesNotCollapse is a +// focused debug reproducer for distributed hidden-criteria queries: the data +// node serializes partial aggregate rows through DataPoint egress, so the +// liaison sees group keys as ColumnTypeTagValue passthrough cells. Distinct +// TagValue keys must remain distinct during reduce. +func TestBatchAggregation_AggModeReduce_TagValueGroupKeyDoesNotCollapse(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleShardID, Name: shardIDOutputName, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "f", Name: "entity_id", Type: vectorized.ColumnTypeTagValue}, + {Role: vectorized.RoleField, Name: "sum_v", Type: vectorized.ColumnTypeFieldValue}, + }) + specs, bindErr := bindAggReduceSpecs(s, []AggReduceSpec{{OutputName: "sum_v", Func: AggSum}}) + if bindErr != nil { + t.Fatalf("bindAggReduceSpecs: %v", bindErr) + } + op := NewBatchAggregation(s, []int{1}, specs, AggModeReduce, 8, vectorized.NewMemoryTracker(1<<30), 0) + defer op.Close() + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[1].(*vectorized.TypedColumn[*modelv1.TagValue]).Append(&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "e2e-service-consumer"}}}) + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 10}}}) + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[1].(*vectorized.TypedColumn[*modelv1.TagValue]).Append(&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "e2e-service-provider"}}}) + b.Columns[2].(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 20}}}) + b.Len = 2 + batches := feedReduce(t, op, b) + if len(batches) != 1 || batches[0].Len != 2 { + t.Fatalf("expected 1 output batch with 2 rows, got %d batches and %d rows", len(batches), batches[0].Len) + } + gotKeys := make(map[string]struct{}, batches[0].Len) + keyCol := batches[0].Columns[0].(*vectorized.TypedColumn[*modelv1.TagValue]) + for rowIdx := range batches[0].Len { + key := keyCol.Data()[rowIdx].GetStr().GetValue() + gotKeys[key] = struct{}{} + } + for _, wantKey := range []string{"e2e-service-consumer", "e2e-service-provider"} { + if _, ok := gotKeys[wantKey]; !ok { + t.Fatalf("missing reduced group key %q in %v", wantKey, gotKeys) + } + } +} + // TestBatchAggregation_AggModeReduce_DedupsSameShardSameGroup asserts replica // duplicates — same shard_id + same group_key — collapse to a single // contribution. Two identical (shard=1, g=a, sum_v=10) rows yield sum=10. diff --git a/pkg/query/vectorized/measure/groupby.go b/pkg/query/vectorized/measure/groupby.go index 7904da051..eff23216e 100644 --- a/pkg/query/vectorized/measure/groupby.go +++ b/pkg/query/vectorized/measure/groupby.go @@ -251,35 +251,85 @@ func (g *BatchGroupBy) encodeKey(dst []byte, b *vectorized.RecordBatch, rowIdx i // - float64: 8 little-endian bytes of math.Float64bits, with -0.0 → +0.0. // - string: 4-byte little-endian length prefix + raw bytes. // - bytes: 4-byte little-endian length prefix + raw bytes. +// - TagValue/FieldValue passthrough: same encoding as the scalar payload type. // // This helper is shared by BatchGroupBy.encodeKey and BatchAggregation.computeKey // so the two operators agree on key equivalence (Copilot G3 review issues 1+2). func appendKeyComponent(dst []byte, col vectorized.Column, rowIdx int) []byte { switch c := col.(type) { case *vectorized.TypedColumn[int64]: - var b [8]byte - binary.LittleEndian.PutUint64(b[:], uint64(c.Data()[rowIdx])) - return append(dst, b[:]...) + return appendIntKey(dst, c.Data()[rowIdx]) case *vectorized.TypedColumn[float64]: - v := c.Data()[rowIdx] - if v == 0 { - v = 0 // canonicalise -0.0 → +0.0 so they hash identically - } - var b [8]byte - binary.LittleEndian.PutUint64(b[:], math.Float64bits(v)) - return append(dst, b[:]...) + return appendFloatKey(dst, c.Data()[rowIdx]) case *vectorized.TypedColumn[string]: - s := c.Data()[rowIdx] - var lb [4]byte - binary.LittleEndian.PutUint32(lb[:], uint32(len(s))) - dst = append(dst, lb[:]...) - return append(dst, s...) + return appendStringKey(dst, c.Data()[rowIdx]) case *vectorized.TypedColumn[[]byte]: - bs := c.Data()[rowIdx] - var lb [4]byte - binary.LittleEndian.PutUint32(lb[:], uint32(len(bs))) - dst = append(dst, lb[:]...) - return append(dst, bs...) + return appendBytesKey(dst, c.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.TagValue]: + return appendTagValueKey(dst, c.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.FieldValue]: + return appendFieldValueKey(dst, c.Data()[rowIdx]) + } + return dst +} + +func appendIntKey(dst []byte, value int64) []byte { + var b [8]byte + binary.LittleEndian.PutUint64(b[:], uint64(value)) + return append(dst, b[:]...) +} + +func appendFloatKey(dst []byte, value float64) []byte { + if value == 0 { + value = 0 // canonicalise -0.0 → +0.0 so they hash identically + } + var b [8]byte + binary.LittleEndian.PutUint64(b[:], math.Float64bits(value)) + return append(dst, b[:]...) +} + +func appendStringKey(dst []byte, value string) []byte { + var lb [4]byte + binary.LittleEndian.PutUint32(lb[:], uint32(len(value))) + dst = append(dst, lb[:]...) + return append(dst, value...) +} + +func appendBytesKey(dst []byte, value []byte) []byte { + var lb [4]byte + binary.LittleEndian.PutUint32(lb[:], uint32(len(value))) + dst = append(dst, lb[:]...) + return append(dst, value...) +} + +func appendTagValueKey(dst []byte, value *modelv1.TagValue) []byte { + if value == nil { + return dst + } + switch payload := value.GetValue().(type) { + case *modelv1.TagValue_Int: + return appendIntKey(dst, payload.Int.GetValue()) + case *modelv1.TagValue_Str: + return appendStringKey(dst, payload.Str.GetValue()) + case *modelv1.TagValue_BinaryData: + return appendBytesKey(dst, payload.BinaryData) + } + return dst +} + +func appendFieldValueKey(dst []byte, value *modelv1.FieldValue) []byte { + if value == nil { + return dst + } + switch payload := value.GetValue().(type) { + case *modelv1.FieldValue_Int: + return appendIntKey(dst, payload.Int.GetValue()) + case *modelv1.FieldValue_Float: + return appendFloatKey(dst, payload.Float.GetValue()) + case *modelv1.FieldValue_Str: + return appendStringKey(dst, payload.Str.GetValue()) + case *modelv1.FieldValue_BinaryData: + return appendBytesKey(dst, payload.BinaryData) } return dst } diff --git a/pkg/query/vectorized/measure/reduce.go b/pkg/query/vectorized/measure/reduce.go index 1a8c7d7e8..4a5433ea1 100644 --- a/pkg/query/vectorized/measure/reduce.go +++ b/pkg/query/vectorized/measure/reduce.go @@ -193,11 +193,21 @@ func bindAggReduceSpecs(schema *vectorized.BatchSchema, aggSpecs []AggReduceSpec out := make([]AggSpec, 0, len(aggSpecs)) for _, ars := range aggSpecs { valueIdx := -1 + fieldValueIdx := -1 for i, def := range schema.Columns { - if def.Role == vectorized.RoleField && def.Name == ars.OutputName { + if def.Role != vectorized.RoleField || def.Name != ars.OutputName { + continue + } + if isAggReduceValueType(def.Type) { valueIdx = i break } + if def.Type == vectorized.ColumnTypeFieldValue && fieldValueIdx < 0 { + fieldValueIdx = i + } + } + if valueIdx < 0 { + valueIdx = fieldValueIdx } if valueIdx < 0 { return nil, fmt.Errorf("agg output column %q not present in partial schema", ars.OutputName) @@ -211,6 +221,10 @@ func bindAggReduceSpecs(schema *vectorized.BatchSchema, aggSpecs []AggReduceSpec return out, nil } +func isAggReduceValueType(columnType vectorized.ColumnType) bool { + return columnType == vectorized.ColumnTypeInt64 || columnType == vectorized.ColumnTypeFloat64 +} + // schemaCompatible asserts that two BatchSchemas describe the same column // layout — every per-column Name, Role and Type matches. The hard-cutover // model forbids schema drift across partials of the same query, so any diff --git a/test/e2e-v2/cases/cluster/docker-compose.yml b/test/e2e-v2/cases/cluster/docker-compose.yml index 0e6585b84..e4e899a24 100644 --- a/test/e2e-v2/cases/cluster/docker-compose.yml +++ b/test/e2e-v2/cases/cluster/docker-compose.yml @@ -20,7 +20,7 @@ services: extends: file: ../../script/docker-compose/base-compose.yml service: data - command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml + command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./nodes.yaml:/etc/banyandb/nodes.yaml networks: @@ -30,7 +30,7 @@ services: extends: file: ../../script/docker-compose/base-compose.yml service: liaison - command: liaison --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml + command: liaison --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./nodes.yaml:/etc/banyandb/nodes.yaml networks: diff --git a/test/e2e-v2/cases/event/banyandb/docker-compose.yml b/test/e2e-v2/cases/event/banyandb/docker-compose.yml index 62c9575bf..535169b93 100644 --- a/test/e2e-v2/cases/event/banyandb/docker-compose.yml +++ b/test/e2e-v2/cases/event/banyandb/docker-compose.yml @@ -18,7 +18,7 @@ services: extends: file: ../../../script/docker-compose/base-compose.yml service: banyandb - command: standalone + command: standalone ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e diff --git a/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml b/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml index 98f1078ac..d480d75b0 100644 --- a/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml +++ b/test/e2e-v2/cases/lifecycle/data-generator/docker-compose.yml @@ -21,7 +21,7 @@ services: ports: - 17912:17912 - 17913:17913 - command: standalone --measure-metadata-cache-wait-duration 1m --stream-metadata-cache-wait-duration 1m + command: standalone --measure-metadata-cache-wait-duration 1m --stream-metadata-cache-wait-duration 1m ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e diff --git a/test/e2e-v2/cases/lifecycle/docker-compose.yml b/test/e2e-v2/cases/lifecycle/docker-compose.yml index a5fcaee1e..77991c147 100644 --- a/test/e2e-v2/cases/lifecycle/docker-compose.yml +++ b/test/e2e-v2/cases/lifecycle/docker-compose.yml @@ -19,7 +19,7 @@ services: file: ../../script/docker-compose/base-compose.yml service: data hostname: data-hot1 - command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=hot + command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=hot ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./data-generator/tmp/metadata:/tmp/measure/data/sw_metadata - ./nodes.yaml:/etc/banyandb/nodes.yaml @@ -31,7 +31,7 @@ services: file: ../../script/docker-compose/base-compose.yml service: data hostname: data-warm1 - command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=warm + command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=warm ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./nodes.yaml:/etc/banyandb/nodes.yaml networks: @@ -42,7 +42,7 @@ services: file: ../../script/docker-compose/base-compose.yml service: data hostname: data-cold1 - command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=cold + command: data --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --node-labels type=cold ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./data-generator/tmp/measure:/tmp/measure - ./data-generator/tmp/stream:/tmp/stream @@ -56,7 +56,7 @@ services: extends: file: ../../script/docker-compose/base-compose.yml service: liaison - command: liaison --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --data-node-selector type=hot + command: liaison --node-discovery-mode=file --node-discovery-file-path=/etc/banyandb/nodes.yaml --data-node-selector type=hot ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} volumes: - ./nodes.yaml:/etc/banyandb/nodes.yaml networks: diff --git a/test/e2e-v2/cases/log/banyandb/docker-compose.yml b/test/e2e-v2/cases/log/banyandb/docker-compose.yml index 79eb59a3c..faab41b86 100644 --- a/test/e2e-v2/cases/log/banyandb/docker-compose.yml +++ b/test/e2e-v2/cases/log/banyandb/docker-compose.yml @@ -20,6 +20,7 @@ services: extends: file: ../../../script/docker-compose/base-compose.yml service: banyandb + command: standalone ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e diff --git a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml index e09b185f8..7cf389453 100644 --- a/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml +++ b/test/e2e-v2/cases/profiling/trace/banyandb/docker-compose.yml @@ -20,7 +20,7 @@ services: extends: file: ../../../../script/docker-compose/base-compose.yml service: banyandb - command: standalone + command: standalone ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e diff --git a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml similarity index 97% copy from test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml copy to test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml index 0c366a8fe..3176c00d0 100644 --- a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml +++ b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment-vec.yaml @@ -36,6 +36,7 @@ spec: imagePullPolicy: IfNotPresent args: - standalone + - --measure-vectorized-enabled=true ports: - name: grpc containerPort: 17912 diff --git a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml index 0c366a8fe..86bbfb038 100644 --- a/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml +++ b/test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml @@ -36,6 +36,7 @@ spec: imagePullPolicy: IfNotPresent args: - standalone + - --measure-vectorized-enabled=false ports: - name: grpc containerPort: 17912 diff --git a/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml b/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml index 200167081..c9816645a 100644 --- a/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml +++ b/test/e2e-v2/cases/rover/process/istio/banyandb/e2e-banyandb.yaml @@ -48,7 +48,7 @@ setup: - name: Load BanyanDB image into kind command: kind load docker-image apache/skywalking-banyandb:${TAG}-testing --name kind - name: Deploy BanyanDB - command: envsubst < test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml | kubectl apply -f - + command: envsubst < ${BANYANDB_DEPLOYMENT_FILE:-test/e2e-v2/cases/rover/process/istio/banyandb/banyandb-deployment.yaml} | kubectl apply -f - - name: Wait for BanyanDB command: kubectl rollout status deployment/banyandb -n istio-system --timeout=120s - name: Install SkyWalking diff --git a/test/e2e-v2/cases/storage/banyandb/docker-compose.yml b/test/e2e-v2/cases/storage/banyandb/docker-compose.yml index 8d12c8e83..a9942588c 100644 --- a/test/e2e-v2/cases/storage/banyandb/docker-compose.yml +++ b/test/e2e-v2/cases/storage/banyandb/docker-compose.yml @@ -20,7 +20,7 @@ services: extends: file: ../../../script/docker-compose/base-compose.yml service: banyandb - command: standalone + command: standalone ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e diff --git a/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml b/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml index 67ddc3fee..e09297e9d 100644 --- a/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml +++ b/test/e2e-v2/cases/zipkin/banyandb/docker-compose.yml @@ -20,6 +20,7 @@ services: extends: file: ../../../script/docker-compose/base-compose.yml service: banyandb + command: standalone ${BANYANDB_MEASURE_VECTORIZED_FLAGS:---measure-vectorized-enabled=false} networks: - e2e
