This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/vectorized-query by this push:
     new 76baa6a4f test(soak): G9e.2 extend soak parity catalog with G9a-d 
query shapes
76baa6a4f is described below

commit 76baa6a4f376d566163d3b085ca83c746e7b802a
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 15 15:58:20 2026 +0000

    test(soak): G9e.2 extend soak parity catalog with G9a-d query shapes
    
    catalogEntry now wraps a full *measurev1.QueryRequest decoded via
    protojson (proto oneofs in Criteria/TagValue cannot round-trip through
    stdlib encoding/json), keyed by a catalog-unique id. buildQueryRequest
    proto.Clones the template and injects TimeRange/Limit, so GroupBy /
    Agg / Top / OrderBy / Criteria / TagProjection ride through unchanged.
    
    default.json grows from 1 scan entry to 8, exercising the G9 shapes
    the soak replay-and-diff previously never covered: Top-N, scalar
    reduce, raw GroupBy, GroupBy+Agg, hidden criteria tag, OrderBy, COUNT
    — all against the seeded soak/soak_metric measure. COUNT runs on the
    INT field (seed-fixture seeds no float field; COUNT-on-float parity is
    covered by the G9e.1 bench W8).
---
 cmd/soak-driver/catalog/default.json | 98 ++++++++++++++++++++++++++++++++++--
 cmd/soak-driver/main.go              | 85 +++++++++++++++++++------------
 2 files changed, 148 insertions(+), 35 deletions(-)

diff --git a/cmd/soak-driver/catalog/default.json 
b/cmd/soak-driver/catalog/default.json
index b78cf5ece..05c15b23b 100644
--- a/cmd/soak-driver/catalog/default.json
+++ b/cmd/soak-driver/catalog/default.json
@@ -1,9 +1,99 @@
 [
   {
-    "name": "soak_metric",
-    "groups": ["soak"],
-    "field_projection": {
-      "names": ["value", "count"]
+    "id": "scan_projection",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "fieldProjection": { "names": ["value", "count"] }
+    }
+  },
+  {
+    "id": "topn_value",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "fieldProjection": { "names": ["value"] },
+      "top": { "number": 10, "fieldName": "value", "fieldValueSort": 
"SORT_DESC" }
+    }
+  },
+  {
+    "id": "scalar_reduce_sum_value",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "fieldProjection": { "names": ["value"] },
+      "agg": { "function": "AGGREGATION_FUNCTION_SUM", "fieldName": "value" }
+    }
+  },
+  {
+    "id": "raw_groupby_service",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "tagProjection": {
+        "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+      },
+      "fieldProjection": { "names": ["value"] },
+      "groupBy": {
+        "tagProjection": {
+          "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+        },
+        "fieldName": "value"
+      }
+    }
+  },
+  {
+    "id": "groupby_service_sum_value",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "tagProjection": {
+        "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+      },
+      "fieldProjection": { "names": ["value"] },
+      "groupBy": {
+        "tagProjection": {
+          "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+        },
+        "fieldName": "value"
+      },
+      "agg": { "function": "AGGREGATION_FUNCTION_SUM", "fieldName": "value" }
+    }
+  },
+  {
+    "id": "hidden_criteria_instance",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "tagProjection": {
+        "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+      },
+      "fieldProjection": { "names": ["value"] },
+      "criteria": {
+        "condition": {
+          "name": "instance_id",
+          "op": "BINARY_OP_EQ",
+          "value": { "int": { "value": "3" } }
+        }
+      }
+    }
+  },
+  {
+    "id": "order_by_time_desc",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "fieldProjection": { "names": ["value", "count"] },
+      "orderBy": { "sort": "SORT_DESC" }
+    }
+  },
+  {
+    "id": "count_value",
+    "request": {
+      "name": "soak_metric",
+      "groups": ["soak"],
+      "fieldProjection": { "names": ["value"] },
+      "agg": { "function": "AGGREGATION_FUNCTION_COUNT", "fieldName": "value" }
     }
   }
 ]
diff --git a/cmd/soak-driver/main.go b/cmd/soak-driver/main.go
index 85d962a25..45f413a00 100644
--- a/cmd/soak-driver/main.go
+++ b/cmd/soak-driver/main.go
@@ -58,12 +58,23 @@ const (
        soakFieldCount  = "count"
 )
 
-// catalogEntry holds the user-visible fields from the JSON catalog.
-// TimeRange is injected at runtime.
+// catalogEntry holds one query template from the JSON catalog. ID is a
+// catalog-unique label used as the baseline key (the proto measure name
+// is the same for every entry, so it cannot key the baseline). Request is
+// a proto-JSON QueryRequest so proto oneofs (Criteria, TagValue) and
+// enums (AggregationFunction, Sort) round-trip correctly — stdlib
+// encoding/json cannot populate proto oneof interface fields. TimeRange
+// and Limit are injected at runtime by buildQueryRequest.
 type catalogEntry struct {
-       FieldProjection *measurev1.QueryRequest_FieldProjection 
`json:"field_projection,omitempty"`
-       Name            string                                  `json:"name"`
-       Groups          []string                                `json:"groups"`
+       Request *measurev1.QueryRequest
+       ID      string
+}
+
+// rawCatalogEntry is the on-disk shape: an "id" label plus a proto-JSON
+// "request" object decoded separately via protojson.
+type rawCatalogEntry struct {
+       ID      string          `json:"id"`
+       Request json.RawMessage `json:"request"`
 }
 
 // baselineRecord is persisted to disk after record-baseline runs.
@@ -116,15 +127,29 @@ func dialInsecure(addr string) (*grpc.ClientConn, error) {
        return conn, nil
 }
 
-// loadCatalog reads the JSON catalog file and returns its entries.
+// loadCatalog reads the JSON catalog file and returns its entries. The
+// catalog is a JSON array of QueryRequest objects; each element is parsed
+// with protojson so proto oneofs/enums round-trip. stdlib json splits the
+// array, protojson decodes each element.
 func loadCatalog(path string) ([]catalogEntry, error) {
        raw, readErr := os.ReadFile(path)
        if readErr != nil {
                return nil, fmt.Errorf("read catalog %s: %w", path, readErr)
        }
-       var entries []catalogEntry
-       if unmarshalErr := json.Unmarshal(raw, &entries); unmarshalErr != nil {
-               return nil, fmt.Errorf("unmarshal catalog: %w", unmarshalErr)
+       var rawEntries []rawCatalogEntry
+       if unmarshalErr := json.Unmarshal(raw, &rawEntries); unmarshalErr != 
nil {
+               return nil, fmt.Errorf("unmarshal catalog array: %w", 
unmarshalErr)
+       }
+       entries := make([]catalogEntry, 0, len(rawEntries))
+       for idx, rawEntry := range rawEntries {
+               if rawEntry.ID == "" {
+                       return nil, fmt.Errorf("catalog entry %d: missing id", 
idx)
+               }
+               req := new(measurev1.QueryRequest)
+               if protoErr := protojson.Unmarshal(rawEntry.Request, req); 
protoErr != nil {
+                       return nil, fmt.Errorf("unmarshal catalog entry %q: 
%w", rawEntry.ID, protoErr)
+               }
+               entries = append(entries, catalogEntry{ID: rawEntry.ID, 
Request: req})
        }
        return entries, nil
 }
@@ -135,17 +160,13 @@ func loadCatalog(path string) ([]catalogEntry, error) {
 func buildQueryRequest(entry catalogEntry, untilMs int64) 
*measurev1.QueryRequest {
        untilTime := time.UnixMilli(untilMs)
        beginTime := untilTime.Add(-1 * time.Hour)
-       req := &measurev1.QueryRequest{
-               Name:   entry.Name,
-               Groups: entry.Groups,
-               TimeRange: &modelv1.TimeRange{
-                       Begin: timestamppb.New(beginTime),
-                       End:   timestamppb.New(untilTime),
-               },
-               Limit: 100000,
+       req, _ := proto.Clone(entry.Request).(*measurev1.QueryRequest)
+       req.TimeRange = &modelv1.TimeRange{
+               Begin: timestamppb.New(beginTime),
+               End:   timestamppb.New(untilTime),
        }
-       if entry.FieldProjection != nil {
-               req.FieldProjection = entry.FieldProjection
+       if req.GetLimit() == 0 {
+               req.Limit = 100000
        }
        return req
 }
@@ -175,29 +196,30 @@ func newRecordBaselineCmd() *cobra.Command {
 
                        for _, entry := range entries {
                                req := buildQueryRequest(entry, untilMs)
+                               queryName := entry.ID
                                ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
                                resp, queryErr := client.Query(ctx, req)
                                cancel()
                                if queryErr != nil {
                                        failed++
-                                       fmt.Printf("[record-baseline] %s: SKIP 
(%v)\n", entry.Name, queryErr)
+                                       fmt.Printf("[record-baseline] %s: SKIP 
(%v)\n", queryName, queryErr)
                                        continue
                                }
                                rec := baselineRecord{
-                                       QueryName: entry.Name,
-                                       Groups:    entry.Groups,
+                                       QueryName: queryName,
+                                       Groups:    entry.Request.GetGroups(),
                                        UntilMs:   untilMs,
                                }
                                for _, dp := range resp.GetDataPoints() {
                                        raw, marshalErr := protojson.Marshal(dp)
                                        if marshalErr != nil {
-                                               return fmt.Errorf("marshal data 
point for %s: %w", entry.Name, marshalErr)
+                                               return fmt.Errorf("marshal data 
point for %s: %w", queryName, marshalErr)
                                        }
                                        rec.DataPoints = append(rec.DataPoints, 
json.RawMessage(raw))
                                }
                                records = append(records, rec)
                                succeeded++
-                               fmt.Printf("[record-baseline] %s: %d data 
points\n", entry.Name, len(rec.DataPoints))
+                               fmt.Printf("[record-baseline] %s: %d data 
points\n", queryName, len(rec.DataPoints))
                        }
                        if succeeded == 0 {
                                return fmt.Errorf("record-baseline: all %d 
catalog queries failed (no usable baseline)", failed)
@@ -269,13 +291,14 @@ func newReplayAndDiffCmd() *cobra.Command {
                        }
 
                        for _, entry := range entries {
-                               rec, ok := baselineMap[entry.Name]
+                               queryName := entry.ID
+                               rec, ok := baselineMap[queryName]
                                if !ok {
                                        // Catalog has a query the baseline 
doesn't (e.g. baseline
                                        // skipped it because the measure 
wasn't installed yet).
                                        // Skip the diff for this entry rather 
than failing the
                                        // whole replay.
-                                       fmt.Printf("[replay-and-diff] %s: SKIP 
(no baseline)\n", entry.Name)
+                                       fmt.Printf("[replay-and-diff] %s: SKIP 
(no baseline)\n", queryName)
                                        continue
                                }
                                req := buildQueryRequest(entry, rec.UntilMs)
@@ -287,10 +310,10 @@ func newReplayAndDiffCmd() *cobra.Command {
                                        // pass/fail signal still flips, but 
don't abort early —
                                        // we want to attempt every catalog 
entry.
                                        report.Divergences = 
append(report.Divergences, divergence{
-                                               QueryName: entry.Name,
+                                               QueryName: queryName,
                                        })
                                        report.Pass = false
-                                       fmt.Printf("[replay-and-diff] %s: FAIL 
(%v)\n", entry.Name, queryErr)
+                                       fmt.Printf("[replay-and-diff] %s: FAIL 
(%v)\n", queryName, queryErr)
                                        continue
                                }
                                report.QueriesRun++
@@ -298,7 +321,7 @@ func newReplayAndDiffCmd() *cobra.Command {
                                replayDPs := resp.GetDataPoints()
                                if len(replayDPs) != len(rec.DataPoints) {
                                        div := divergence{
-                                               QueryName:   entry.Name,
+                                               QueryName:   queryName,
                                                BaselineLen: 
len(rec.DataPoints),
                                                ReplayLen:   len(replayDPs),
                                        }
@@ -307,12 +330,12 @@ func newReplayAndDiffCmd() *cobra.Command {
                                        continue
                                }
 
-                               div := divergence{QueryName: entry.Name, 
BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)}
+                               div := divergence{QueryName: queryName, 
BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)}
                                hasDiff := false
                                for idx, baselineRaw := range rec.DataPoints {
                                        baselineDP := new(measurev1.DataPoint)
                                        if parseErr := 
protojson.Unmarshal(baselineRaw, baselineDP); parseErr != nil {
-                                               return fmt.Errorf("unmarshal 
baseline dp %d for %s: %w", idx, entry.Name, parseErr)
+                                               return fmt.Errorf("unmarshal 
baseline dp %d for %s: %w", idx, queryName, parseErr)
                                        }
                                        if !proto.Equal(baselineDP, 
replayDPs[idx]) {
                                                hasDiff = true

Reply via email to