Copilot commented on code in PR #1129:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1129#discussion_r3244459351


##########
pkg/query/vectorized/pipeline.go:
##########
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+)
+
+// Pipeline is the composed sequence of stages from source to final breaker.
+// It exposes a single PullOperator-shaped Next method to the driver.
+type Pipeline struct {
+       head    PullOperator
+       tracker *MemoryTracker
+       closed  bool
+}
+
+// Init cascades initialization down through every stage to the source.
+// Must be called once before the first Next, after Build. Re-calling is
+// safe but pointless — each stage's Init is idempotent only if its
+// underlying operator's Init is.
+func (p *Pipeline) Init(ctx context.Context) error {
+       return p.head.Init(ctx)
+}
+
+// Next returns the next batch from the head stage.
+func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) {
+       return p.head.NextBatch(ctx)
+}
+
+// Tracker returns the shared per-pipeline MemoryTracker, or nil if the builder
+// did not set one. Operators that bookkeep memory should be constructed with
+// this tracker so they all draw from a single budget.
+func (p *Pipeline) Tracker() *MemoryTracker { return p.tracker }
+
+// Close closes the head stage. Idempotent — repeat calls are no-ops.
+func (p *Pipeline) Close() error {
+       if p.closed {
+               return nil
+       }
+       p.closed = true
+       return p.head.Close()
+}
+
+// PipelineBuilder fluently composes a Pipeline.
+type PipelineBuilder struct {
+       source       PullOperator
+       tracker      *MemoryTracker
+       pendingFused []FusibleOperator
+       breakers     []BreakerOperator
+}
+
+// NewPipelineBuilder starts a builder.
+func NewPipelineBuilder() *PipelineBuilder { return &PipelineBuilder{} }
+
+// From sets the leaf source.
+func (b *PipelineBuilder) From(p PullOperator) *PipelineBuilder { b.source = 
p; return b }
+
+// WithMemoryTracker attaches a shared MemoryTracker to the pipeline. Operators
+// that bookkeep memory (BatchGroupBy, BatchAggregation) should be constructed
+// with this same tracker so reservations stack against a single budget.
+func (b *PipelineBuilder) WithMemoryTracker(t *MemoryTracker) *PipelineBuilder 
{
+       b.tracker = t
+       return b
+}
+
+// Apply queues a FusibleOperator to fold into the next stage.
+func (b *PipelineBuilder) Apply(f FusibleOperator) *PipelineBuilder {
+       b.pendingFused = append(b.pendingFused, f)
+       return b
+}
+
+// Break appends a breaker, finalizing the current fused-stage prefix.
+func (b *PipelineBuilder) Break(br BreakerOperator) *PipelineBuilder {
+       b.breakers = append(b.breakers, br)
+       return b
+}
+
+// Build validates and constructs the Pipeline.
+func (b *PipelineBuilder) Build() (*Pipeline, error) {
+       if b.source == nil {
+               return nil, errors.New("vectorized: pipeline missing source 
(use From)")
+       }
+       var head PullOperator = newFusedStage(b.source, b.pendingFused)
+       for _, br := range b.breakers {
+               head = newBreakerStage(head, br)
+       }
+       return &Pipeline{head: head, tracker: b.tracker}, nil
+}

Review Comment:
   PipelineBuilder currently builds the pipeline as `fusedStage(source, 
pendingFused)` and then wraps *all* breakers afterwards, which means any 
`Apply()` calls made after a `Break()` still execute *before* the breaker(s). 
The vec plan builder does exactly that (e.g. `plan.Limit.Build` calls `Apply` 
after `plan.GroupByAgg.Build` has already called `Break`), so `LIMIT/OFFSET` 
can be incorrectly pushed down ahead of aggregation/top and change query 
results. Consider changing the builder to preserve operator order (e.g., flush 
pending fused into the current head on each `Break`, reset `pendingFused`, and 
allow multiple fused stages).



##########
pkg/query/vectorized/measure/serialize.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "slices"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// serializeBatchToProto converts the active rows of b into 
measurev1.InternalDataPoint
+// messages, appending to dst. Pass dst=nil to allocate; pass dst[:0] to reuse 
capacity.
+//
+// This is the focal point of differential parity testing: it is the only place
+// where the vectorized output shape diverges from the row-based output shape.
+// Row order matches batch row order (respecting Selection); the row-based path
+// produces messages in the same order from MeasureResult iteration.
+func serializeBatchToProto(b *vectorized.RecordBatch, dst 
[]*measurev1.InternalDataPoint) []*measurev1.InternalDataPoint {
+       if dst == nil {
+               dst = make([]*measurev1.InternalDataPoint, 0, b.ActiveLen())
+       }
+       schema := b.Schema
+       active := activeIndices(b)
+       for _, rowIdx := range active {
+               dp := buildDataPoint(b, schema, int(rowIdx))

Review Comment:
   `serializeBatchToProto` calls `activeIndices(b)`, which allocates a fresh 
`[]uint16` whenever `Selection` is nil. `BatchScan` emits batches with 
`Selection == nil`, so this adds a per-batch allocation on the hot path and 
undermines the PR's stated zero-alloc egress goal. Consider iterating directly 
without materializing indices (fast path: `for i := 0; i < b.Len; i++ { ... }`, 
else iterate `Selection`).



##########
pkg/query/vectorized/measure/serialize.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "slices"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// serializeBatchToProto converts the active rows of b into 
measurev1.InternalDataPoint
+// messages, appending to dst. Pass dst=nil to allocate; pass dst[:0] to reuse 
capacity.
+//
+// This is the focal point of differential parity testing: it is the only place
+// where the vectorized output shape diverges from the row-based output shape.
+// Row order matches batch row order (respecting Selection); the row-based path
+// produces messages in the same order from MeasureResult iteration.
+func serializeBatchToProto(b *vectorized.RecordBatch, dst 
[]*measurev1.InternalDataPoint) []*measurev1.InternalDataPoint {
+       if dst == nil {
+               dst = make([]*measurev1.InternalDataPoint, 0, b.ActiveLen())
+       }
+       schema := b.Schema
+       active := activeIndices(b)
+       for _, rowIdx := range active {
+               dp := buildDataPoint(b, schema, int(rowIdx))
+               idp := &measurev1.InternalDataPoint{DataPoint: dp}
+               if i := schema.ShardIDIndex(); i >= 0 {
+                       idp.ShardId = 
uint32(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx])
+               }
+               dst = append(dst, idp)
+       }
+       return dst
+}
+
+// buildDataPoint materializes one DataPoint from row rowIdx of b. Tags are
+// emitted family-by-family using the schema's pre-computed TagFamilyGroups
+// layout — no per-row map allocation. Field columns become DataPoint_Field
+// entries in schema order.
+func buildDataPoint(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, 
rowIdx int) *measurev1.DataPoint {
+       dp := &measurev1.DataPoint{}
+       if i := schema.TimestampIndex(); i >= 0 {
+               ns := 
b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]
+               dp.Timestamp = timestamppb.New(time.Unix(0, ns))
+       }
+       if i := schema.VersionIndex(); i >= 0 {
+               dp.Version = 
b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]
+       }
+       if i := schema.SeriesIDIndex(); i >= 0 {
+               dp.Sid = 
uint64(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx])
+       }
+       if len(schema.TagFamilyGroups) > 0 {
+               dp.TagFamilies = make([]*modelv1.TagFamily, 0, 
len(schema.TagFamilyGroups))
+               for _, group := range schema.TagFamilyGroups {
+                       tf := &modelv1.TagFamily{
+                               Name: group.Family,
+                               Tags: make([]*modelv1.Tag, 0, 
len(group.Columns)),
+                       }
+                       for _, colIdx := range group.Columns {
+                               tf.Tags = append(tf.Tags, &modelv1.Tag{
+                                       Key:   schema.Columns[colIdx].Name,
+                                       Value: 
columnValueToTagValue(b.Columns[colIdx], rowIdx),
+                               })
+                       }
+                       dp.TagFamilies = append(dp.TagFamilies, tf)
+               }
+       }
+       if len(schema.FieldColumns) > 0 {
+               dp.Fields = make([]*measurev1.DataPoint_Field, 0, 
len(schema.FieldColumns))
+               for _, colIdx := range schema.FieldColumns {
+                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                               Name:  schema.Columns[colIdx].Name,
+                               Value: 
columnValueToFieldValue(b.Columns[colIdx], rowIdx),
+                       })
+               }
+       }
+       return dp
+}
+
+// columnValueToTagValue materializes a *modelv1.TagValue from one row of col.
+//
+// Passthrough columns (TypedColumn[*modelv1.TagValue]) take a fast path:
+// the original protobuf pointer from the scan source is returned directly
+// — zero allocation, byte-identical to what the row path emits.
+//
+// Typed columns reconstruct the protobuf wrapper. Slice-typed values
+// (BinaryData, IntArray, StrArray) are defensively copied so the produced
+// TagValue does not alias the column's backing slice across pool reuse.
+func columnValueToTagValue(col vectorized.Column, rowIdx int) 
*modelv1.TagValue {
+       if pc, ok := col.(*vectorized.TypedColumn[*modelv1.TagValue]); ok {
+               v := pc.Data()[rowIdx]
+               if v == nil {
+                       return pbv1NullTagValueRef
+               }
+               return v
+       }
+       if col.IsNull(rowIdx) {
+               return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+       }

Review Comment:
   For typed columns, null tag cells currently allocate a new 
`&modelv1.TagValue{Value: &modelv1.TagValue_Null{}}` per row. Since this 
package already has the `pbv1NullTagValueRef` singleton for null fill, consider 
returning that here as well to avoid per-cell allocations and keep null 
representation consistent across passthrough vs typed columns.



##########
pkg/query/vectorized/measure/serialize.go:
##########
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "slices"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// serializeBatchToProto converts the active rows of b into 
measurev1.InternalDataPoint
+// messages, appending to dst. Pass dst=nil to allocate; pass dst[:0] to reuse 
capacity.
+//
+// This is the focal point of differential parity testing: it is the only place
+// where the vectorized output shape diverges from the row-based output shape.
+// Row order matches batch row order (respecting Selection); the row-based path
+// produces messages in the same order from MeasureResult iteration.
+func serializeBatchToProto(b *vectorized.RecordBatch, dst 
[]*measurev1.InternalDataPoint) []*measurev1.InternalDataPoint {
+       if dst == nil {
+               dst = make([]*measurev1.InternalDataPoint, 0, b.ActiveLen())
+       }
+       schema := b.Schema
+       active := activeIndices(b)
+       for _, rowIdx := range active {
+               dp := buildDataPoint(b, schema, int(rowIdx))
+               idp := &measurev1.InternalDataPoint{DataPoint: dp}
+               if i := schema.ShardIDIndex(); i >= 0 {
+                       idp.ShardId = 
uint32(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx])
+               }
+               dst = append(dst, idp)
+       }
+       return dst
+}
+
+// buildDataPoint materializes one DataPoint from row rowIdx of b. Tags are
+// emitted family-by-family using the schema's pre-computed TagFamilyGroups
+// layout — no per-row map allocation. Field columns become DataPoint_Field
+// entries in schema order.
+func buildDataPoint(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, 
rowIdx int) *measurev1.DataPoint {
+       dp := &measurev1.DataPoint{}
+       if i := schema.TimestampIndex(); i >= 0 {
+               ns := 
b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]
+               dp.Timestamp = timestamppb.New(time.Unix(0, ns))
+       }
+       if i := schema.VersionIndex(); i >= 0 {
+               dp.Version = 
b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]
+       }
+       if i := schema.SeriesIDIndex(); i >= 0 {
+               dp.Sid = 
uint64(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx])
+       }
+       if len(schema.TagFamilyGroups) > 0 {
+               dp.TagFamilies = make([]*modelv1.TagFamily, 0, 
len(schema.TagFamilyGroups))
+               for _, group := range schema.TagFamilyGroups {
+                       tf := &modelv1.TagFamily{
+                               Name: group.Family,
+                               Tags: make([]*modelv1.Tag, 0, 
len(group.Columns)),
+                       }
+                       for _, colIdx := range group.Columns {
+                               tf.Tags = append(tf.Tags, &modelv1.Tag{
+                                       Key:   schema.Columns[colIdx].Name,
+                                       Value: 
columnValueToTagValue(b.Columns[colIdx], rowIdx),
+                               })
+                       }
+                       dp.TagFamilies = append(dp.TagFamilies, tf)
+               }
+       }
+       if len(schema.FieldColumns) > 0 {
+               dp.Fields = make([]*measurev1.DataPoint_Field, 0, 
len(schema.FieldColumns))
+               for _, colIdx := range schema.FieldColumns {
+                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                               Name:  schema.Columns[colIdx].Name,
+                               Value: 
columnValueToFieldValue(b.Columns[colIdx], rowIdx),
+                       })
+               }
+       }
+       return dp
+}
+
+// columnValueToTagValue materializes a *modelv1.TagValue from one row of col.
+//
+// Passthrough columns (TypedColumn[*modelv1.TagValue]) take a fast path:
+// the original protobuf pointer from the scan source is returned directly
+// — zero allocation, byte-identical to what the row path emits.
+//
+// Typed columns reconstruct the protobuf wrapper. Slice-typed values
+// (BinaryData, IntArray, StrArray) are defensively copied so the produced
+// TagValue does not alias the column's backing slice across pool reuse.
+func columnValueToTagValue(col vectorized.Column, rowIdx int) 
*modelv1.TagValue {
+       if pc, ok := col.(*vectorized.TypedColumn[*modelv1.TagValue]); ok {
+               v := pc.Data()[rowIdx]
+               if v == nil {
+                       return pbv1NullTagValueRef
+               }
+               return v
+       }
+       if col.IsNull(rowIdx) {
+               return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+       }
+       switch c := col.(type) {
+       case *vectorized.TypedColumn[int64]:
+               return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: c.Data()[rowIdx]}}}
+       case *vectorized.TypedColumn[string]:
+               return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: c.Data()[rowIdx]}}}
+       case *vectorized.TypedColumn[[]byte]:
+               src := c.Data()[rowIdx]
+               buf := make([]byte, len(src))
+               copy(buf, src)
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: buf}}
+       case *vectorized.TypedColumn[[]int64]:
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: 
slices.Clone(c.Data()[rowIdx])}}}
+       case *vectorized.TypedColumn[[]string]:
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: 
slices.Clone(c.Data()[rowIdx])}}}
+       }
+       return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+}
+
+// columnValueToFieldValue is the field-side counterpart. Passthrough columns
+// for FieldValue return the source pointer directly; typed columns
+// reconstruct the protobuf wrapper with the same defensive-copy rule for
+// BinaryData.
+func columnValueToFieldValue(col vectorized.Column, rowIdx int) 
*modelv1.FieldValue {
+       if pc, ok := col.(*vectorized.TypedColumn[*modelv1.FieldValue]); ok {
+               v := pc.Data()[rowIdx]
+               if v == nil {
+                       return pbv1NullFieldValueRef
+               }
+               return v
+       }
+       if col.IsNull(rowIdx) {
+               return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
+       }

Review Comment:
   For typed columns, null field cells currently allocate a new 
`&modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}` per row. Since 
`pbv1NullFieldValueRef` already exists, consider returning the singleton here 
too to avoid per-cell allocations and keep null representation consistent 
across passthrough vs typed columns.



##########
scripts/soak-monitor.sh:
##########
@@ -0,0 +1,172 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# G5d soak monitor — polls the most recent run under dist/soak/ on a
+# tapered cadence and writes a one-line status per tick. Each line is
+# tagged either OK or ALERT; ALERT means at least one of:
+#   - banyand.log has not been touched in >10 min (container hung)
+#   - memory-alerts.log gained any line (acceptance criterion 3 violated)
+#   - any diff-*.json shows "pass": false (acceptance criterion 2 violated)
+#   - docker compose health probe reports degraded
+#
+# Cadence (matching the operator request):
+#   ticks 1..8 — every 15 min (covers the first 2 h)
+#   ticks 9..  — every 60 min (covers the remainder of the 48 h window)
+#
+# Exits automatically when the run's summary.json appears (soak
+# complete) or on Ctrl-C. Returns non-zero if any ALERT line was
+# emitted — so a wrapper can chain to a notification mechanism.
+#
+# Usage:
+#   ./scripts/soak-monitor.sh                       # watch most recent run
+#   ./scripts/soak-monitor.sh dist/soak/20260512T101010   # specific run
+#
+# Env overrides (rarely needed):
+#   FIRST_PHASE_TICKS    number of 15-min ticks before slowing down (default 8)
+#   FAST_INTERVAL_SEC    fast cadence in seconds (default 900)
+#   SLOW_INTERVAL_SEC    slow cadence in seconds (default 3600)
+#   LOG_STALE_SEC        ALERT threshold for banyand.log freshness (default 
600)
+
+set -euo pipefail
+
+REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
+COMPOSE_FILE="${REPO_ROOT}/test/soak/docker-compose.soak.yaml"
+
+FIRST_PHASE_TICKS="${FIRST_PHASE_TICKS:-8}"
+FAST_INTERVAL_SEC="${FAST_INTERVAL_SEC:-900}"
+SLOW_INTERVAL_SEC="${SLOW_INTERVAL_SEC:-3600}"
+LOG_STALE_SEC="${LOG_STALE_SEC:-600}"
+
+# Resolve the run directory.
+if (( $# >= 1 )); then
+  RUN="$1"
+else
+  RUN="$(ls -td "${REPO_ROOT}"/dist/soak/2026* 2>/dev/null | head -1 || true)"
+fi
+if [[ -z "${RUN:-}" || ! -d "${RUN}" ]]; then
+  echo "[soak-monitor] ERROR: no soak run directory found (looked under 
dist/soak/2026*)"
+  exit 1
+fi
+
+LOG="${RUN}/monitor.log"
+echo "[soak-monitor] watching ${RUN}"
+echo "[soak-monitor] writing status to ${LOG}"
+
+# Tee from this point on so the status log persists alongside the run.
+exec > >(tee -a "${LOG}") 2>&1
+
+count_or_zero() {
+  local n
+  n="$(eval "$1" 2>/dev/null | wc -l | tr -d ' ')"
+  echo "${n:-0}"
+}

Review Comment:
   `count_or_zero` uses `eval` to execute its argument. Even though current 
call sites are internal, this is an unnecessary footgun (easy to accidentally 
introduce command injection or quoting bugs later). Consider refactoring it to 
avoid `eval` (e.g., accept a command+args array, or make the callers perform 
the glob/grep/ls directly and pass only file paths).



##########
scripts/soak-monitor.sh:
##########
@@ -0,0 +1,172 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# G5d soak monitor — polls the most recent run under dist/soak/ on a
+# tapered cadence and writes a one-line status per tick. Each line is
+# tagged either OK or ALERT; ALERT means at least one of:
+#   - banyand.log has not been touched in >10 min (container hung)
+#   - memory-alerts.log gained any line (acceptance criterion 3 violated)
+#   - any diff-*.json shows "pass": false (acceptance criterion 2 violated)
+#   - docker compose health probe reports degraded
+#
+# Cadence (matching the operator request):
+#   ticks 1..8 — every 15 min (covers the first 2 h)
+#   ticks 9..  — every 60 min (covers the remainder of the 48 h window)
+#
+# Exits automatically when the run's summary.json appears (soak
+# complete) or on Ctrl-C. Returns non-zero if any ALERT line was
+# emitted — so a wrapper can chain to a notification mechanism.
+#
+# Usage:
+#   ./scripts/soak-monitor.sh                       # watch most recent run
+#   ./scripts/soak-monitor.sh dist/soak/20260512T101010   # specific run
+#
+# Env overrides (rarely needed):
+#   FIRST_PHASE_TICKS    number of 15-min ticks before slowing down (default 8)
+#   FAST_INTERVAL_SEC    fast cadence in seconds (default 900)
+#   SLOW_INTERVAL_SEC    slow cadence in seconds (default 3600)
+#   LOG_STALE_SEC        ALERT threshold for banyand.log freshness (default 
600)
+
+set -euo pipefail
+
+REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
+COMPOSE_FILE="${REPO_ROOT}/test/soak/docker-compose.soak.yaml"
+
+FIRST_PHASE_TICKS="${FIRST_PHASE_TICKS:-8}"
+FAST_INTERVAL_SEC="${FAST_INTERVAL_SEC:-900}"
+SLOW_INTERVAL_SEC="${SLOW_INTERVAL_SEC:-3600}"
+LOG_STALE_SEC="${LOG_STALE_SEC:-600}"
+
+# Resolve the run directory.
+if (( $# >= 1 )); then
+  RUN="$1"
+else
+  RUN="$(ls -td "${REPO_ROOT}"/dist/soak/2026* 2>/dev/null | head -1 || true)"
+fi
+if [[ -z "${RUN:-}" || ! -d "${RUN}" ]]; then
+  echo "[soak-monitor] ERROR: no soak run directory found (looked under 
dist/soak/2026*)"
+  exit 1

Review Comment:
   The default run-directory discovery is hard-coded to `dist/soak/2026*`, 
which will stop finding runs in subsequent years. Consider using a more general 
glob (e.g. `dist/soak/20*` or `dist/soak/*`) and keeping the error message in 
sync with the search pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to