This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/vectorized-query by this push:
new 76baa6a4f test(soak): G9e.2 extend soak parity catalog with G9a-d
query shapes
76baa6a4f is described below
commit 76baa6a4f376d566163d3b085ca83c746e7b802a
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 15 15:58:20 2026 +0000
test(soak): G9e.2 extend soak parity catalog with G9a-d query shapes
catalogEntry now wraps a full *measurev1.QueryRequest decoded via
protojson (proto oneofs in Criteria/TagValue cannot round-trip through
stdlib encoding/json), keyed by a catalog-unique id. buildQueryRequest
proto.Clones the template and injects TimeRange/Limit, so GroupBy /
Agg / Top / OrderBy / Criteria / TagProjection ride through unchanged.
default.json grows from 1 scan entry to 8, exercising the G9 shapes
the soak replay-and-diff previously never covered: Top-N, scalar
reduce, raw GroupBy, GroupBy+Agg, hidden criteria tag, OrderBy, COUNT
— all against the seeded soak/soak_metric measure. COUNT runs on the
INT field (seed-fixture seeds no float field; COUNT-on-float parity is
covered by the G9e.1 bench W8).
---
cmd/soak-driver/catalog/default.json | 98 ++++++++++++++++++++++++++++++++++--
cmd/soak-driver/main.go | 85 +++++++++++++++++++------------
2 files changed, 148 insertions(+), 35 deletions(-)
diff --git a/cmd/soak-driver/catalog/default.json
b/cmd/soak-driver/catalog/default.json
index b78cf5ece..05c15b23b 100644
--- a/cmd/soak-driver/catalog/default.json
+++ b/cmd/soak-driver/catalog/default.json
@@ -1,9 +1,99 @@
[
{
- "name": "soak_metric",
- "groups": ["soak"],
- "field_projection": {
- "names": ["value", "count"]
+ "id": "scan_projection",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "fieldProjection": { "names": ["value", "count"] }
+ }
+ },
+ {
+ "id": "topn_value",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "fieldProjection": { "names": ["value"] },
+ "top": { "number": 10, "fieldName": "value", "fieldValueSort":
"SORT_DESC" }
+ }
+ },
+ {
+ "id": "scalar_reduce_sum_value",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "fieldProjection": { "names": ["value"] },
+ "agg": { "function": "AGGREGATION_FUNCTION_SUM", "fieldName": "value" }
+ }
+ },
+ {
+ "id": "raw_groupby_service",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "tagProjection": {
+ "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+ },
+ "fieldProjection": { "names": ["value"] },
+ "groupBy": {
+ "tagProjection": {
+ "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+ },
+ "fieldName": "value"
+ }
+ }
+ },
+ {
+ "id": "groupby_service_sum_value",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "tagProjection": {
+ "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+ },
+ "fieldProjection": { "names": ["value"] },
+ "groupBy": {
+ "tagProjection": {
+ "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+ },
+ "fieldName": "value"
+ },
+ "agg": { "function": "AGGREGATION_FUNCTION_SUM", "fieldName": "value" }
+ }
+ },
+ {
+ "id": "hidden_criteria_instance",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "tagProjection": {
+ "tagFamilies": [{ "name": "default", "tags": ["service"] }]
+ },
+ "fieldProjection": { "names": ["value"] },
+ "criteria": {
+ "condition": {
+ "name": "instance_id",
+ "op": "BINARY_OP_EQ",
+ "value": { "int": { "value": "3" } }
+ }
+ }
+ }
+ },
+ {
+ "id": "order_by_time_desc",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "fieldProjection": { "names": ["value", "count"] },
+ "orderBy": { "sort": "SORT_DESC" }
+ }
+ },
+ {
+ "id": "count_value",
+ "request": {
+ "name": "soak_metric",
+ "groups": ["soak"],
+ "fieldProjection": { "names": ["value"] },
+ "agg": { "function": "AGGREGATION_FUNCTION_COUNT", "fieldName": "value" }
}
}
]
diff --git a/cmd/soak-driver/main.go b/cmd/soak-driver/main.go
index 85d962a25..45f413a00 100644
--- a/cmd/soak-driver/main.go
+++ b/cmd/soak-driver/main.go
@@ -58,12 +58,23 @@ const (
soakFieldCount = "count"
)
-// catalogEntry holds the user-visible fields from the JSON catalog.
-// TimeRange is injected at runtime.
+// catalogEntry holds one query template from the JSON catalog. ID is a
+// catalog-unique label used as the baseline key (the proto measure name
+// is the same for every entry, so it cannot key the baseline). Request is
+// a proto-JSON QueryRequest so proto oneofs (Criteria, TagValue) and
+// enums (AggregationFunction, Sort) round-trip correctly — stdlib
+// encoding/json cannot populate proto oneof interface fields. TimeRange
+// and Limit are injected at runtime by buildQueryRequest.
type catalogEntry struct {
- FieldProjection *measurev1.QueryRequest_FieldProjection
`json:"field_projection,omitempty"`
- Name string `json:"name"`
- Groups []string `json:"groups"`
+ Request *measurev1.QueryRequest
+ ID string
+}
+
+// rawCatalogEntry is the on-disk shape: an "id" label plus a proto-JSON
+// "request" object decoded separately via protojson.
+type rawCatalogEntry struct {
+ ID string `json:"id"`
+ Request json.RawMessage `json:"request"`
}
// baselineRecord is persisted to disk after record-baseline runs.
@@ -116,15 +127,29 @@ func dialInsecure(addr string) (*grpc.ClientConn, error) {
return conn, nil
}
-// loadCatalog reads the JSON catalog file and returns its entries.
+// loadCatalog reads the JSON catalog file and returns its entries. The
+// catalog is a JSON array of QueryRequest objects; each element is parsed
+// with protojson so proto oneofs/enums round-trip. stdlib json splits the
+// array, protojson decodes each element.
func loadCatalog(path string) ([]catalogEntry, error) {
raw, readErr := os.ReadFile(path)
if readErr != nil {
return nil, fmt.Errorf("read catalog %s: %w", path, readErr)
}
- var entries []catalogEntry
- if unmarshalErr := json.Unmarshal(raw, &entries); unmarshalErr != nil {
- return nil, fmt.Errorf("unmarshal catalog: %w", unmarshalErr)
+ var rawEntries []rawCatalogEntry
+ if unmarshalErr := json.Unmarshal(raw, &rawEntries); unmarshalErr !=
nil {
+ return nil, fmt.Errorf("unmarshal catalog array: %w",
unmarshalErr)
+ }
+ entries := make([]catalogEntry, 0, len(rawEntries))
+ for idx, rawEntry := range rawEntries {
+ if rawEntry.ID == "" {
+ return nil, fmt.Errorf("catalog entry %d: missing id",
idx)
+ }
+ req := new(measurev1.QueryRequest)
+ if protoErr := protojson.Unmarshal(rawEntry.Request, req);
protoErr != nil {
+ return nil, fmt.Errorf("unmarshal catalog entry %q:
%w", rawEntry.ID, protoErr)
+ }
+ entries = append(entries, catalogEntry{ID: rawEntry.ID,
Request: req})
}
return entries, nil
}
@@ -135,17 +160,13 @@ func loadCatalog(path string) ([]catalogEntry, error) {
func buildQueryRequest(entry catalogEntry, untilMs int64)
*measurev1.QueryRequest {
untilTime := time.UnixMilli(untilMs)
beginTime := untilTime.Add(-1 * time.Hour)
- req := &measurev1.QueryRequest{
- Name: entry.Name,
- Groups: entry.Groups,
- TimeRange: &modelv1.TimeRange{
- Begin: timestamppb.New(beginTime),
- End: timestamppb.New(untilTime),
- },
- Limit: 100000,
+ req, _ := proto.Clone(entry.Request).(*measurev1.QueryRequest)
+ req.TimeRange = &modelv1.TimeRange{
+ Begin: timestamppb.New(beginTime),
+ End: timestamppb.New(untilTime),
}
- if entry.FieldProjection != nil {
- req.FieldProjection = entry.FieldProjection
+ if req.GetLimit() == 0 {
+ req.Limit = 100000
}
return req
}
@@ -175,29 +196,30 @@ func newRecordBaselineCmd() *cobra.Command {
for _, entry := range entries {
req := buildQueryRequest(entry, untilMs)
+ queryName := entry.ID
ctx, cancel :=
context.WithTimeout(context.Background(), 30*time.Second)
resp, queryErr := client.Query(ctx, req)
cancel()
if queryErr != nil {
failed++
- fmt.Printf("[record-baseline] %s: SKIP
(%v)\n", entry.Name, queryErr)
+ fmt.Printf("[record-baseline] %s: SKIP
(%v)\n", queryName, queryErr)
continue
}
rec := baselineRecord{
- QueryName: entry.Name,
- Groups: entry.Groups,
+ QueryName: queryName,
+ Groups: entry.Request.GetGroups(),
UntilMs: untilMs,
}
for _, dp := range resp.GetDataPoints() {
raw, marshalErr := protojson.Marshal(dp)
if marshalErr != nil {
- return fmt.Errorf("marshal data
point for %s: %w", entry.Name, marshalErr)
+ return fmt.Errorf("marshal data
point for %s: %w", queryName, marshalErr)
}
rec.DataPoints = append(rec.DataPoints,
json.RawMessage(raw))
}
records = append(records, rec)
succeeded++
- fmt.Printf("[record-baseline] %s: %d data
points\n", entry.Name, len(rec.DataPoints))
+ fmt.Printf("[record-baseline] %s: %d data
points\n", queryName, len(rec.DataPoints))
}
if succeeded == 0 {
return fmt.Errorf("record-baseline: all %d
catalog queries failed (no usable baseline)", failed)
@@ -269,13 +291,14 @@ func newReplayAndDiffCmd() *cobra.Command {
}
for _, entry := range entries {
- rec, ok := baselineMap[entry.Name]
+ queryName := entry.ID
+ rec, ok := baselineMap[queryName]
if !ok {
// Catalog has a query the baseline
doesn't (e.g. baseline
// skipped it because the measure
wasn't installed yet).
// Skip the diff for this entry rather
than failing the
// whole replay.
- fmt.Printf("[replay-and-diff] %s: SKIP
(no baseline)\n", entry.Name)
+ fmt.Printf("[replay-and-diff] %s: SKIP
(no baseline)\n", queryName)
continue
}
req := buildQueryRequest(entry, rec.UntilMs)
@@ -287,10 +310,10 @@ func newReplayAndDiffCmd() *cobra.Command {
// pass/fail signal still flips, but
don't abort early —
// we want to attempt every catalog
entry.
report.Divergences =
append(report.Divergences, divergence{
- QueryName: entry.Name,
+ QueryName: queryName,
})
report.Pass = false
- fmt.Printf("[replay-and-diff] %s: FAIL
(%v)\n", entry.Name, queryErr)
+ fmt.Printf("[replay-and-diff] %s: FAIL
(%v)\n", queryName, queryErr)
continue
}
report.QueriesRun++
@@ -298,7 +321,7 @@ func newReplayAndDiffCmd() *cobra.Command {
replayDPs := resp.GetDataPoints()
if len(replayDPs) != len(rec.DataPoints) {
div := divergence{
- QueryName: entry.Name,
+ QueryName: queryName,
BaselineLen:
len(rec.DataPoints),
ReplayLen: len(replayDPs),
}
@@ -307,12 +330,12 @@ func newReplayAndDiffCmd() *cobra.Command {
continue
}
- div := divergence{QueryName: entry.Name,
BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)}
+ div := divergence{QueryName: queryName,
BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)}
hasDiff := false
for idx, baselineRaw := range rec.DataPoints {
baselineDP := new(measurev1.DataPoint)
if parseErr :=
protojson.Unmarshal(baselineRaw, baselineDP); parseErr != nil {
- return fmt.Errorf("unmarshal
baseline dp %d for %s: %w", idx, entry.Name, parseErr)
+ return fmt.Errorf("unmarshal
baseline dp %d for %s: %w", idx, queryName, parseErr)
}
if !proto.Equal(baselineDP,
replayDPs[idx]) {
hasDiff = true