This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b2be017c8 Fix migration failure when row replay from large data (#1154)
b2be017c8 is described below
commit b2be017c8373477a840d46a68fd3ef7b6728e601
Author: mrproliu <[email protected]>
AuthorDate: Thu Jun 4 16:50:42 2026 +0800
Fix migration failure when row replay from large data (#1154)
---
.../backup/lifecycle/measure_migration_visitor.go | 22 +-
banyand/backup/lifecycle/row_replay_measure.go | 106 +---
banyand/backup/lifecycle/row_replay_pipeline.go | 351 +++++++++++++
banyand/backup/lifecycle/row_replay_stream.go | 98 +---
banyand/backup/lifecycle/row_replay_test.go | 550 +++++++++++++++++++++
banyand/backup/lifecycle/row_replay_trace.go | 102 +---
.../backup/lifecycle/stream_migration_visitor.go | 22 +-
.../backup/lifecycle/trace_migration_visitor.go | 30 +-
8 files changed, 951 insertions(+), 330 deletions(-)
diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go
b/banyand/backup/lifecycle/measure_migration_visitor.go
index 128c77c09..b462ee33c 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -392,30 +392,18 @@ func (mv *measureMigrationVisitor) visitPartRowReplay(ctx
context.Context, segme
Int("target_segments_count", len(targetSegments)).
Str("group", mv.group).
Msg("measure part spans multiple target segments; switching to
row-replay")
+ // replayPart sends and confirms the part's rows through the bounded
pipeline,
+ // draining all in-flight confirmations before returning. A non-nil
error means
+ // some rows were not durably delivered; marking the part errored
(rather than
+ // completed) ensures the resume guard re-replays the whole part.
rowCount, err := replayer.replayPart(ctx, partPath)
if err != nil {
// Row-replay is all-or-nothing per part; mark the source part
errored so
// resume retries the whole part (same source key the guard
checks above).
+ recordReplayNodeErrors(mv.progress, mv.group, err)
mv.progress.MarkMeasurePartError(mv.group, sourceSegmentIDStr,
sourceShardID, partID, err.Error())
return fmt.Errorf("row-replay measure part %s: %w", partPath,
err)
}
- // Confirm this part's rows reached every node before marking it
completed.
- // replayPart only enqueues; the batch publisher is client-streaming so
- // per-node errors surface only when its stream closes, so
flushAndConfirm
- // closes the publisher to collect that result (then opens a fresh one
for the
- // next part). Marking before this confirmation could report success
for rows
- // a flush failure never delivered, and the resume guard would then
skip the
- // part, losing data.
- cee, flushErr := replayer.flushAndConfirm(ctx)
- if flushErr != nil || len(cee) > 0 {
- mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
- confirmErr := flushErr
- if confirmErr == nil {
- confirmErr = fmt.Errorf("%d node error(s)", len(cee))
- }
- mv.progress.MarkMeasurePartError(mv.group, sourceSegmentIDStr,
sourceShardID, partID, confirmErr.Error())
- return fmt.Errorf("confirm row-replay measure part %s: %w",
partPath, confirmErr)
- }
mv.progress.MarkMeasurePartCompleted(mv.group, sourceSegmentIDStr,
sourceShardID, partID)
mv.progress.MarkSourceMeasurePartCompleted(mv.group, partPath,
sourceShardID, partID)
mv.progress.AddMeasureRowReplay(mv.group, rowCount)
diff --git a/banyand/backup/lifecycle/row_replay_measure.go
b/banyand/backup/lifecycle/row_replay_measure.go
index 80587334c..5966b7d65 100644
--- a/banyand/backup/lifecycle/row_replay_measure.go
+++ b/banyand/backup/lifecycle/row_replay_measure.go
@@ -23,7 +23,6 @@ import (
"path/filepath"
"strconv"
"sync"
- "sync/atomic"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -64,8 +63,7 @@ type cachedMeasureSchema struct {
// real Write API, resolving each row's schema on demand via EntityValues.
type measureRowReplayer struct {
selector node.Selector
- client queue.Client
- publisher queue.BatchPublisher
+ sender *batchSender
fs fs.FileSystem
logger *logger.Logger
measureSchemas map[string]*databasev1.Measure
@@ -75,10 +73,8 @@ type measureRowReplayer struct {
counter *uint64
group string
irPath string
- batch []bus.Message
schemaCacheMu sync.Mutex
irMu sync.Mutex
- batchMu sync.Mutex
targetShardNum uint32
}
@@ -115,24 +111,20 @@ func newMeasureRowReplayer(
group: group,
targetShardNum: targetShardNum,
selector: selector,
- client: client,
- publisher:
client.NewBatchPublisher(measureReplayBatchTimeout),
+ sender: newBatchSender(client, data.TopicMeasureWrite,
measureReplayBatchSize, measureReplayBatchTimeout),
fs: fileSystem,
logger: l,
measureSchemas: measureSchemas,
schemaCache: make(map[string]*cachedMeasureSchema),
mergedRuleToTag: deriveMergedRuleToTag(measures, rules,
bindings),
counter: counter,
- batch: make([]bus.Message, 0, measureReplayBatchSize),
}, nil
}
-// Close flushes pending messages, closes the publisher and releases cached
-// IndexResolver handles.
+// Close drains any outstanding batch confirmations, closes the publisher and
+// releases cached IndexResolver handles.
func (r *measureRowReplayer) Close() (map[string]*common.Error, error) {
- flushCtx, cancel := context.WithTimeout(context.Background(),
measureReplayBatchTimeout)
- defer cancel()
- flushErr := r.flushBatch(flushCtx)
+ cee, closeErr := r.sender.close()
r.irMu.Lock()
if r.irResolver != nil {
_ = r.irResolver.Close()
@@ -140,25 +132,6 @@ func (r *measureRowReplayer) Close()
(map[string]*common.Error, error) {
r.irPath = ""
}
r.irMu.Unlock()
- cee, closeErr := r.publisher.Close()
- if flushErr != nil {
- return cee, flushErr
- }
- return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain
the
-// per-node delivery result for everything sent since the last call, then
opens a
-// fresh publisher for subsequent parts. The batch publisher is
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed part be confirmed durable before it is marked completed.
-func (r *measureRowReplayer) flushAndConfirm(ctx context.Context)
(map[string]*common.Error, error) {
- flushErr := r.flushBatch(ctx)
- cee, closeErr := r.publisher.Close()
- r.publisher = r.client.NewBatchPublisher(measureReplayBatchTimeout)
- if flushErr != nil {
- return cee, flushErr
- }
return cee, closeErr
}
@@ -211,7 +184,9 @@ func (r *measureRowReplayer) loadIndexResolver(segmentPath
string) (*dump.IndexR
return ir, nil
}
-// replayPart opens a source part and publishes each row through the queue.
+// replayPart opens a source part and replays its rows through the sender,
which
+// sends and confirms batches through the bounded pipeline. The returned error
is
+// non-nil when any row build/route, iteration, or batch confirmation failed.
func (r *measureRowReplayer) replayPart(ctx context.Context, partPath string)
(int, error) {
partID, parseErr := strconv.ParseUint(filepath.Base(partPath), 16, 64)
if parseErr != nil {
@@ -234,38 +209,8 @@ func (r *measureRowReplayer) replayPart(ctx
context.Context, partPath string) (i
it := reader.Iterator()
defer it.Close()
-
- rowCount := 0
- for it.Next() {
- row := it.Row()
- if rowErr := r.publishRow(ctx, ir, row); rowErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(rowErr).
- Str("group", r.group).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("measure row-replay aborted mid-part on
publish error; will retry on resume")
- return rowCount, rowErr
- }
- rowCount++
- }
- if iterErr := it.Err(); iterErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(iterErr).
- Str("group", r.group).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("measure row-replay aborted mid-part; will retry on
resume")
- return rowCount, iterErr
- }
- if r.counter != nil {
- atomic.AddUint64(r.counter, 1)
- }
- return rowCount, nil
+ return r.sender.replay(ctx, r.logger, r.group, partPath, r.counter, it,
+ func() error { return r.publishRow(ctx, ir, it.Row()) })
}
// buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -308,7 +253,9 @@ func (r *measureRowReplayer) buildWriteRequest(
return wr, iwr, nil
}
-// publishRow rebuilds a WriteRequest from a measure Row and enqueues it.
+// publishRow rebuilds a WriteRequest from a measure Row and hands it to the
+// sender. The returned error is non-nil only on a build/route error or when a
+// full batch's asynchronous confirmation surfaced an earlier per-node failure.
func (r *measureRowReplayer) publishRow(ctx context.Context, ir
*dump.IndexResolver, row dumpmeasure.Row) error {
wr, iwr, err := r.buildWriteRequest(ir, row)
if err != nil {
@@ -319,30 +266,5 @@ func (r *measureRowReplayer) publishRow(ctx
context.Context, ir *dump.IndexResol
return fmt.Errorf("pick target node for %s: %w",
wr.Metadata.Name, err)
}
msg :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
- return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *measureRowReplayer) enqueue(ctx context.Context, msg bus.Message)
error {
- r.batchMu.Lock()
- r.batch = append(r.batch, msg)
- shouldFlush := len(r.batch) >= measureReplayBatchSize
- r.batchMu.Unlock()
- if shouldFlush {
- return r.flushBatch(ctx)
- }
- return nil
-}
-
-func (r *measureRowReplayer) flushBatch(ctx context.Context) error {
- r.batchMu.Lock()
- if len(r.batch) == 0 {
- r.batchMu.Unlock()
- return nil
- }
- pending := r.batch
- r.batch = make([]bus.Message, 0, measureReplayBatchSize)
- r.batchMu.Unlock()
- _, err := r.publisher.Publish(ctx, data.TopicMeasureWrite, pending...)
- return err
+ return r.sender.enqueue(ctx, msg)
}
diff --git a/banyand/backup/lifecycle/row_replay_pipeline.go
b/banyand/backup/lifecycle/row_replay_pipeline.go
new file mode 100644
index 000000000..de0a21dd6
--- /dev/null
+++ b/banyand/backup/lifecycle/row_replay_pipeline.go
@@ -0,0 +1,351 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "runtime/debug"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/dump"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// rowReplayMaxInflight bounds how many row-replay batches may be sent but not
+// yet confirmed at once. A depth of 2 lets one batch's receiver-side write
+// overlap with the building and sending of the next batch (double buffering)
+// while capping the extra sender/receiver memory and write contention.
+const rowReplayMaxInflight = 2
+
+// nodeReplayError reports a failed row-replay batch. It distinguishes a global
+// send/confirm failure (err: the whole publish/close failed, or a row could
not
+// be built or routed) from per-node delivery failures (cee: the receiver
+// rejected rows on specific nodes, keyed by node). At least one is set
whenever
+// this error is returned; newNodeReplayError yields nil when neither is.
+type nodeReplayError struct {
+ err error
+ cee map[string]*common.Error
+}
+
+func (e *nodeReplayError) Error() string {
+ if e.err != nil {
+ return e.err.Error()
+ }
+ return fmt.Sprintf("%d node error(s)", len(e.cee))
+}
+
+func (e *nodeReplayError) Unwrap() error {
+ return e.err
+}
+
+// newNodeReplayError reduces a confirmed batch's result to an error: nil when
+// the batch fully succeeded (no global error and no per-node errors),
otherwise
+// a *nodeReplayError carrying whichever failures occurred.
+func newNodeReplayError(cee map[string]*common.Error, err error) error {
+ if err == nil && len(cee) == 0 {
+ return nil
+ }
+ return &nodeReplayError{cee: cee, err: err}
+}
+
+// recordReplayNodeErrors records any per-node delivery errors carried by err
+// into progress so they surface in the migration report. It is a no-op for a
+// global-only error.
+func recordReplayNodeErrors(progress *Progress, group string, err error) {
+ var nre *nodeReplayError
+ if errors.As(err, &nre) && len(nre.cee) > 0 {
+ progress.RecordRowReplayNodeErrors(group, nre.cee)
+ }
+}
+
+// confirmPipeline bounds the number of in-flight (sent but unconfirmed)
+// row-replay batches. Each batch is confirmed on its own client-streaming
+// publisher in a background goroutine that delivers its error (nil on success)
+// on a channel; the pipeline keeps those channels in FIFO order so
+// confirmations are observed in send order. It is driven by a single goroutine
+// (the part's replay loop) and needs no internal locking.
+type confirmPipeline struct {
+ inflight []chan error
+ depth int
+ // waited accumulates, over the pipeline's lifetime, the wall time the
driving
+ // goroutine spent blocked on confirmations (the in-flight bound in add
plus
+ // drain). Since that goroutine is either building or blocked here, it
measures
+ // how long the sender was starved waiting on the receiver.
+ waited time.Duration
+}
+
+func newConfirmPipeline() *confirmPipeline {
+ return &confirmPipeline{depth: rowReplayMaxInflight}
+}
+
+// add registers a batch confirmation that will be delivered on ch. When the
+// pipeline is already full it first waits for the oldest in-flight
confirmation
+// so no more than depth batches are ever outstanding, returning that awaited
+// error (a non-nil error means an earlier batch must abort the part).
+func (p *confirmPipeline) add(ch chan error) error {
+ var awaited error
+ if len(p.inflight) >= p.depth {
+ start := time.Now()
+ awaited = <-p.inflight[0]
+ p.waited += time.Since(start)
+ p.inflight = p.inflight[1:]
+ }
+ p.inflight = append(p.inflight, ch)
+ return awaited
+}
+
+// drain waits for every remaining in-flight confirmation and returns the first
+// failing error (or nil when all succeeded). It is safe to call more than
once;
+// a drained pipeline is empty.
+func (p *confirmPipeline) drain() error {
+ var first error
+ for _, ch := range p.inflight {
+ start := time.Now()
+ out := <-ch
+ p.waited += time.Since(start)
+ if first == nil && out != nil {
+ first = out
+ }
+ }
+ p.inflight = nil
+ return first
+}
+
+// batchSender buffers row-replay messages and, once a batch fills, sends it
on a
+// client-streaming publisher and confirms it asynchronously through a bounded
+// confirmPipeline. Each batch is published on its own publisher (rotated in
+// immediately) and confirmed in the background, so one batch's receiver-side
+// write overlaps with building and sending the next. It owns the whole
publisher
+// lifecycle (open, rotate, close) so the per-data-type replayers only build
+// messages and enqueue them. batchSize and timeout are supplied by each data
+// type. A sender is driven by a single goroutine (the part's replay loop):
only
+// it touches the buffer, and each background confirmation touches just its own
+// captured publisher and channel, so no buffer locking is needed.
+type batchSender struct {
+ client queue.Client
+ publisher queue.BatchPublisher
+ pipeline *confirmPipeline
+ batch []bus.Message
+ topic bus.Topic
+ timeout time.Duration
+ batchSize int
+}
+
+// newBatchSender builds a sender for one data type. batchSize and timeout are
+// per-type knobs each replayer (measure/stream/trace) supplies from its own
+// constants, reserved for per-type tuning; they coincide at 2000/30s today.
+func newBatchSender(client queue.Client, topic bus.Topic, batchSize int,
timeout time.Duration) *batchSender {
+ return &batchSender{
+ client: client,
+ publisher: client.NewBatchPublisher(timeout),
+ pipeline: newConfirmPipeline(),
+ topic: topic,
+ batch: make([]bus.Message, 0, batchSize),
+ timeout: timeout,
+ batchSize: batchSize,
+ }
+}
+
+// enqueue buffers msg and, when the batch is full, sends and asynchronously
+// confirms it. The returned error is non-nil only when the in-flight bound
+// forced it to wait on (and surface) an earlier batch's failure.
+func (s *batchSender) enqueue(ctx context.Context, msg bus.Message) error {
+ s.batch = append(s.batch, msg)
+ if len(s.batch) >= s.batchSize {
+ return s.flush(ctx)
+ }
+ return nil
+}
+
+// flush sends the buffered batch on the current publisher, rotates in a fresh
+// publisher for the next batch, and confirms the sent batch (closing its
stream
+// to collect the per-node delivery result) in the background. The returned
error
+// aborts the part promptly: it is this batch's synchronous send error, or —
when
+// the in-flight bound made it wait — an earlier batch's surfaced result (which
+// takes priority, preserving send order). It is a no-op when the buffer is
empty.
+func (s *batchSender) flush(ctx context.Context) error {
+ pending := s.take()
+ if len(pending) == 0 {
+ return nil
+ }
+ _, pubErr := s.publisher.Publish(ctx, s.topic, pending...)
+ pub := s.publisher
+ s.publisher = s.client.NewBatchPublisher(s.timeout)
+ ch := make(chan error, 1)
+ go func() {
+ // The pipeline blocks in add/drain until exactly one value
arrives on ch, so
+ // the send must happen on every path: a deferred send (with
recover)
+ // guarantees it even if pub.Close panics, turning a would-be
deadlock into a
+ // surfaced error.
+ var out error
+ defer func() {
+ if rec := recover(); rec != nil {
+ out = fmt.Errorf("row-replay batch confirm
panicked: %v\n%s", rec, debug.Stack())
+ }
+ ch <- out
+ }()
+ cee, closeErr := pub.Close()
+ err := pubErr
+ if err == nil {
+ err = closeErr
+ } else {
+ cee = nil // publish failed — receiver saw nothing, so
per-node cee is noise
+ }
+ out = newNodeReplayError(cee, err)
+ }()
+ // Abort promptly: an earlier in-flight batch's failure (via the bound)
wins,
+ // otherwise this batch's own synchronous send error.
+ if awaited := s.pipeline.add(ch); awaited != nil {
+ return awaited
+ }
+ if pubErr != nil {
+ return newNodeReplayError(nil, pubErr)
+ }
+ return nil
+}
+
+// drain waits for every in-flight batch confirmation and returns the first
+// failure (or nil when all succeeded).
+func (s *batchSender) drain() error {
+ return s.pipeline.drain()
+}
+
+// rowCursor is the subset of a dump iterator the replay driver needs; the emit
+// closure reads the current row itself. *dump.Iterator[T] satisfies it for
any T.
+type rowCursor interface {
+ Next() bool
+ Err() error
+ Position() dump.Position
+}
+
+// replay drives one part's row-replay: it pulls rows from cur, sends each
through
+// emit (which builds the message and enqueues it on this sender), flushes the
+// trailing batch, then drains the pipeline so the part is reported durable
only
+// once every batch is confirmed. counter (when non-nil) counts completed
parts,
+// and a per-part build_send vs confirm_wait timing line is logged on success.
The
+// returned error is non-nil when any row build/route, iteration, or batch
+// confirmation failed; the caller maps that to its progress bookkeeping.
+func (s *batchSender) replay(ctx context.Context, l *logger.Logger, group,
part string, counter *uint64,
+ cur rowCursor, emit func() error,
+) (int, error) {
+ start := time.Now()
+ waited0 := s.pipeline.waited
+ warnAbort := func(rows int, err error) {
+ pos := cur.Position()
+ l.Warn().Err(err).
+ Str("group", group).Str("part", part).
+ Int("block_idx", pos.BlockIdx).Int("row_idx",
pos.RowIdx).
+ Int("rows_published", rows).
+ Msg("row-replay aborted mid-part; will retry on resume")
+ }
+ rowCount := 0
+ var failure error
+ for cur.Next() {
+ if err := emit(); err != nil {
+ warnAbort(rowCount, err)
+ failure = err
+ break
+ }
+ rowCount++
+ }
+ if failure == nil {
+ if iterErr := cur.Err(); iterErr != nil {
+ warnAbort(rowCount, iterErr)
+ failure = iterErr
+ } else {
+ failure = s.flush(ctx)
+ }
+ }
+ // Wait for every in-flight batch confirmation before returning so the
part is
+ // only reported durable once all of its rows are acknowledged.
+ if drained := s.drain(); failure == nil {
+ failure = drained
+ }
+ if failure != nil {
+ // Drop the failed part's un-flushed residual rows (an abort
skips the
+ // trailing flush) so a later close() cannot resurrect rows the
caller was
+ // told were not delivered; the whole part is re-replayed on
resume.
+ s.discard()
+ return rowCount, failure
+ }
+ if counter != nil {
+ atomic.AddUint64(counter, 1)
+ }
+ logRowReplayTiming(l, group, part, rowCount, time.Since(start),
s.pipeline.waited-waited0)
+ return rowCount, nil
+}
+
+// logRowReplayTiming emits one info line per replayed part splitting its wall
+// time into build_send (sender CPU: decode, route, marshal, publish) and
+// confirm_wait (time the sender was blocked waiting on the receiver). A high
+// confirm_wait points at the receiver; a high build_send points at the sender.
+func logRowReplayTiming(l *logger.Logger, group, part string, rows int, total,
wait time.Duration) {
+ if rows == 0 {
+ return
+ }
+ buildSend := total - wait
+ if buildSend < 0 {
+ buildSend = 0
+ }
+ l.Info().
+ Str("group", group).
+ Str("part", part).
+ Int("rows", rows).
+ Dur("total", total).
+ Dur("build_send", buildSend).
+ Dur("confirm_wait", wait).
+ Msg("row-replay part timing")
+}
+
+// close waits for any outstanding confirmations and closes the idle publisher.
+// By construction the buffer is empty here: every replay flushes its trailing
+// batch on success or discards it on failure, so close never needs to send.
+func (s *batchSender) close() (map[string]*common.Error, error) {
+ drained := s.drain()
+ cee, closeErr := s.publisher.Close()
+ if drained != nil {
+ var nre *nodeReplayError
+ if errors.As(drained, &nre) {
+ return nre.cee, nre.err
+ }
+ return nil, drained
+ }
+ return cee, closeErr
+}
+
+// take swaps out the buffered batch, returning nil when empty.
+func (s *batchSender) take() []bus.Message {
+ if len(s.batch) == 0 {
+ return nil
+ }
+ pending := s.batch
+ s.batch = make([]bus.Message, 0, s.batchSize)
+ return pending
+}
+
+// discard drops any buffered rows not yet flushed, abandoning the residual of
a
+// failed part so it is never sent. It is take() with the rows thrown away.
+func (s *batchSender) discard() {
+ s.take()
+}
diff --git a/banyand/backup/lifecycle/row_replay_stream.go
b/banyand/backup/lifecycle/row_replay_stream.go
index 9bcb3347c..92db82ec1 100644
--- a/banyand/backup/lifecycle/row_replay_stream.go
+++ b/banyand/backup/lifecycle/row_replay_stream.go
@@ -24,7 +24,6 @@ import (
"path/filepath"
"strconv"
"sync"
- "sync/atomic"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -59,8 +58,7 @@ type cachedStreamSchema struct {
// streamRowReplayer replays a group's stream parts row-by-row.
type streamRowReplayer struct {
selector node.Selector
- client queue.Client
- publisher queue.BatchPublisher
+ sender *batchSender
metadata metadata.Repo
fs fs.FileSystem
logger *logger.Logger
@@ -69,10 +67,8 @@ type streamRowReplayer struct {
counter *uint64
group string
irPath string
- batch []bus.Message
schemaCacheMu sync.Mutex
irMu sync.Mutex
- batchMu sync.Mutex
targetShardNum uint32
}
@@ -86,23 +82,19 @@ func newStreamRowReplayer(
group: group,
targetShardNum: targetShardNum,
selector: selector,
- client: client,
- publisher:
client.NewBatchPublisher(streamReplayBatchTimeout),
+ sender: newBatchSender(client, data.TopicStreamWrite,
streamReplayBatchSize, streamReplayBatchTimeout),
metadata: md,
fs: fileSystem,
logger: l,
schemaCache: make(map[string]*cachedStreamSchema),
counter: counter,
- batch: make([]bus.Message, 0, streamReplayBatchSize),
}
}
-// Close flushes pending messages, closes the publisher and releases cached
-// IndexResolver handles.
+// Close drains any outstanding batch confirmations, closes the publisher and
+// releases cached IndexResolver handles.
func (r *streamRowReplayer) Close() (map[string]*common.Error, error) {
- flushCtx, cancel := context.WithTimeout(context.Background(),
streamReplayBatchTimeout)
- defer cancel()
- flushErr := r.flushBatch(flushCtx)
+ cee, closeErr := r.sender.close()
r.irMu.Lock()
if r.irResolver != nil {
_ = r.irResolver.Close()
@@ -110,25 +102,6 @@ func (r *streamRowReplayer) Close()
(map[string]*common.Error, error) {
r.irPath = ""
}
r.irMu.Unlock()
- cee, closeErr := r.publisher.Close()
- if flushErr != nil {
- return cee, flushErr
- }
- return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain
the
-// per-node delivery result for everything sent since the last call, then
opens a
-// fresh publisher for subsequent parts. The batch publisher is
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed part be confirmed durable before it is marked completed.
-func (r *streamRowReplayer) flushAndConfirm(ctx context.Context)
(map[string]*common.Error, error) {
- flushErr := r.flushBatch(ctx)
- cee, closeErr := r.publisher.Close()
- r.publisher = r.client.NewBatchPublisher(streamReplayBatchTimeout)
- if flushErr != nil {
- return cee, flushErr
- }
return cee, closeErr
}
@@ -198,38 +171,8 @@ func (r *streamRowReplayer) replayPart(ctx
context.Context, partPath string) (in
it := reader.Iterator()
defer it.Close()
-
- rowCount := 0
- for it.Next() {
- row := it.Row()
- if rowErr := r.publishRow(ctx, row); rowErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(rowErr).
- Str("group", r.group).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("stream row-replay aborted mid-part on
publish error; will retry on resume")
- return rowCount, rowErr
- }
- rowCount++
- }
- if iterErr := it.Err(); iterErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(iterErr).
- Str("group", r.group).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("stream row-replay aborted mid-part; will retry on
resume")
- return rowCount, iterErr
- }
- if r.counter != nil {
- atomic.AddUint64(r.counter, 1)
- }
- return rowCount, nil
+ return r.sender.replay(ctx, r.logger, r.group, partPath, r.counter, it,
+ func() error { return r.publishRow(ctx, it.Row()) })
}
// buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -277,30 +220,5 @@ func (r *streamRowReplayer) publishRow(ctx
context.Context, row dumpstream.Row)
return fmt.Errorf("pick target node for %s: %w",
wr.Metadata.Name, err)
}
msg :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
- return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *streamRowReplayer) enqueue(ctx context.Context, msg bus.Message)
error {
- r.batchMu.Lock()
- r.batch = append(r.batch, msg)
- shouldFlush := len(r.batch) >= streamReplayBatchSize
- r.batchMu.Unlock()
- if shouldFlush {
- return r.flushBatch(ctx)
- }
- return nil
-}
-
-func (r *streamRowReplayer) flushBatch(ctx context.Context) error {
- r.batchMu.Lock()
- if len(r.batch) == 0 {
- r.batchMu.Unlock()
- return nil
- }
- pending := r.batch
- r.batch = make([]bus.Message, 0, streamReplayBatchSize)
- r.batchMu.Unlock()
- _, err := r.publisher.Publish(ctx, data.TopicStreamWrite, pending...)
- return err
+ return r.sender.enqueue(ctx, msg)
}
diff --git a/banyand/backup/lifecycle/row_replay_test.go
b/banyand/backup/lifecycle/row_replay_test.go
index 5aa2931b1..56ee9cb85 100644
--- a/banyand/backup/lifecycle/row_replay_test.go
+++ b/banyand/backup/lifecycle/row_replay_test.go
@@ -24,15 +24,18 @@ import (
gofs "io/fs"
"path/filepath"
"strconv"
+ "sync/atomic"
"testing"
"time"
"github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.uber.org/mock/gomock"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -966,3 +969,550 @@ func findRoundtripPartDirs(root string) []string {
})
return dirs
}
+
+// countingBatchPublisher is a queue.BatchPublisher fake that records how many
+// times it was published to and closed, and how many messages each Publish
+// carried. Close runs in the replayer's background confirmation goroutines, so
+// the counters are atomic. closeCee injects a per-node delivery failure,
+// publishErr injects a synchronous send failure, and panicOnClose makes Close
+// panic to exercise the confirm goroutine's panic-safety.
+type countingBatchPublisher struct {
+ closeCee map[string]*common.Error
+ publishErr error
+ panicOnClose bool
+ publishCount atomic.Int32
+ publishedRows atomic.Int32
+ closeCount atomic.Int32
+}
+
+func (f *countingBatchPublisher) Publish(_ context.Context, _ bus.Topic,
messages ...bus.Message) (bus.Future, error) {
+ f.publishCount.Add(1)
+ f.publishedRows.Add(int32(len(messages)))
+ return nil, f.publishErr
+}
+
+func (f *countingBatchPublisher) Close() (map[string]*common.Error, error) {
+ f.closeCount.Add(1)
+ if f.panicOnClose {
+ panic("simulated publisher close panic")
+ }
+ return f.closeCee, nil
+}
+
+// senderReturning wires a batchSender whose successive publishers are the
given
+// ones in order, then fresh clean publishers once the list is exhausted. It
+// centralizes the mock-controller + NewBatchPublisher boilerplate and the
+// "first publisher carries the first batch" rotation order for the sender
tests.
+// The returned accessor reports every publisher actually vended (provided and
+// fresh) — including appends that happen after this returns — so a test can
+// assert the exact number vended and inspect each, catching extra
+// rotations/publishes that a pre-sized expectation would miss.
+func senderReturning(t *testing.T, pubs ...*countingBatchPublisher)
(*batchSender, func() []*countingBatchPublisher) {
+ t.Helper()
+ ctrl := gomock.NewController(t)
+ t.Cleanup(ctrl.Finish)
+ mockClient := queue.NewMockClient(ctrl)
+ var vended []*countingBatchPublisher
+ i := 0
+ mockClient.EXPECT().NewBatchPublisher(measureReplayBatchTimeout).
+ DoAndReturn(func(time.Duration) queue.BatchPublisher {
+ var p *countingBatchPublisher
+ if i < len(pubs) {
+ p, i = pubs[i], i+1
+ } else {
+ p = &countingBatchPublisher{}
+ }
+ vended = append(vended, p)
+ return p
+ }).AnyTimes()
+ return newBatchSender(mockClient, data.TopicMeasureWrite,
measureReplayBatchSize, measureReplayBatchTimeout),
+ func() []*countingBatchPublisher { return vended }
+}
+
+// totalPublished sums the publish counts across the given publishers.
+func totalPublished(pubs []*countingBatchPublisher) int32 {
+ var total int32
+ for _, p := range pubs {
+ total += p.publishCount.Load()
+ }
+ return total
+}
+
+// totalPublishedRows sums the message counts actually handed to Publish across
+// the given publishers, i.e. how many source rows were really sent.
+func totalPublishedRows(pubs []*countingBatchPublisher) int32 {
+ var total int32
+ for _, p := range pubs {
+ total += p.publishedRows.Load()
+ }
+ return total
+}
+
+// cleanEmit returns a replay emit callback that enqueues one dummy row per
call.
+func cleanEmit(s *batchSender) func() error {
+ return func() error {
+ return s.enqueue(context.TODO(),
+ bus.NewBatchMessageWithNode(bus.MessageID(1), "node-1",
&measurev1.InternalWriteRequest{}))
+ }
+}
+
+// failingEmit is cleanEmit that instead returns a build error on the failAt-th
+// call, aborting the part with whatever was buffered/sent so far.
+func failingEmit(s *batchSender, failAt int) func() error {
+ clean := cleanEmit(s)
+ calls := 0
+ return func() error {
+ calls++
+ if calls == failAt {
+ return fmt.Errorf("build failed")
+ }
+ return clean()
+ }
+}
+
+// nodeErrCee extracts the per-node delivery errors carried by a failed replay
+// error, failing the test if err is nil or not a *nodeReplayError.
+func nodeErrCee(t *testing.T, err error) map[string]*common.Error {
+ t.Helper()
+ var nre *nodeReplayError
+ require.ErrorAs(t, err, &nre)
+ return nre.cee
+}
+
+// TestConfirmPipeline_BoundsInflightDepth proves the pipeline never keeps more
+// than depth confirmations outstanding: a (depth+1)th add blocks until the
+// oldest in-flight confirmation resolves, and drain surfaces the first
failure.
+func TestConfirmPipeline_BoundsInflightDepth(t *testing.T) {
+ // rowReplayMaxInflight is 2, so a 3rd unconfirmed batch must wait.
+ p := newConfirmPipeline()
+ ch1 := make(chan error, 1)
+ ch2 := make(chan error, 1)
+ ch3 := make(chan error, 1)
+
+ // Within depth: add must not block and has nothing to surface yet.
+ require.NoError(t, p.add(ch1))
+ require.NoError(t, p.add(ch2))
+
+ // Full (2 in flight): the 3rd add must block until the oldest (ch1)
resolves.
+ added := make(chan error, 1)
+ go func() { added <- p.add(ch3) }()
+ select {
+ case <-added:
+ t.Fatal("add exceeded the in-flight depth without waiting for
the oldest confirmation")
+ case <-time.After(100 * time.Millisecond):
+ }
+ ch1 <- nil // oldest confirms cleanly
+ select {
+ case out := <-added:
+ require.NoError(t, out, "a clean oldest confirmation must not
report failure")
+ case <-time.After(2 * time.Second):
+ t.Fatal("add did not unblock after the oldest confirmation
resolved")
+ }
+
+ // drain waits for the rest and returns the first failing error.
+ ch2 <- nil
+ ch3 <- newNodeReplayError(map[string]*common.Error{"node-1":
common.NewError("boom")}, nil)
+ require.Contains(t, nodeErrCee(t, p.drain()), "node-1")
+}
+
+// enqueueN feeds n messages through the sender, failing the test on any error
+// surfaced early by the in-flight bound.
+func enqueueN(t *testing.T, s *batchSender, n int) {
+ t.Helper()
+ emit := cleanEmit(s)
+ for i := 0; i < n; i++ {
+ require.NoError(t, emit())
+ }
+}
+
+// TestBatchSender_SendsRotatesAndConfirmsEachBatch verifies the sender flow:
+// every full batch is published once on its own publisher, the publisher is
+// rotated for the next batch, and each sent batch is confirmed (closed)
exactly
+// once, with all confirmations drained before the part is considered durable.
+func TestBatchSender_SendsRotatesAndConfirmsEachBatch(t *testing.T) {
+ const batches = 3
+ s, vended := senderReturning(t)
+ enqueueN(t, s, batches*measureReplayBatchSize)
+ require.NoError(t, s.drain())
+
+ require.Len(t, vended(), batches+1, "exactly the initial publisher plus
one rotation per sent batch")
+ for i := 0; i < batches; i++ {
+ require.Equalf(t, int32(1), vended()[i].publishCount.Load(),
"publisher %d must send its batch once", i)
+ require.Equalf(t, int32(1), vended()[i].closeCount.Load(),
"publisher %d must be confirmed (closed) once", i)
+ }
+ require.Equal(t, int32(0), vended()[batches].publishCount.Load(), "the
staged publisher holds no batch yet")
+}
+
+// TestBatchSender_SurfacesBatchNodeError verifies a per-node delivery failure
on
+// a confirmed batch propagates out of the sender (via the final drain) so the
+// caller can mark the part errored and retry it on resume.
+func TestBatchSender_SurfacesBatchNodeError(t *testing.T) {
+ failing := &countingBatchPublisher{closeCee:
map[string]*common.Error{"node-1": common.NewError("flush timeout")}}
+ s, _ := senderReturning(t, failing) // the first publisher carries the
batch and fails
+ enqueueN(t, s, measureReplayBatchSize)
+ require.Contains(t, nodeErrCee(t, s.drain()), "node-1",
+ "a batch's per-node delivery error must surface when drained")
+}
+
+// fakeCursor yields n rows for the replay driver, with a zero Position. err,
when
+// set, is returned by Err() after the rows are exhausted to exercise the
+// iterator-error path.
+type fakeCursor struct {
+ err error
+ n int
+ i int
+}
+
+func (c *fakeCursor) Next() bool { c.i++; return c.i <= c.n }
+func (c *fakeCursor) Err() error { return c.err }
+func (c *fakeCursor) Position() dump.Position { return dump.Position{} }
+
+// TestBatchSender_SurfacesClosePanic pins the panic-safety of the confirm
+// goroutine: if a publisher's Close panics, the goroutine must still deliver a
+// (failed) error so the pipeline never deadlocks. Without the deferred
+// recover+send this would hang (or crash), so the bounded wait turns a
+// regression into a failed assertion rather than a hung test.
+func TestBatchSender_SurfacesClosePanic(t *testing.T) {
+ s, _ := senderReturning(t, &countingBatchPublisher{panicOnClose: true})
+ enqueueN(t, s, measureReplayBatchSize) // one full batch on the
panicking publisher
+
+ done := make(chan error, 1)
+ go func() { done <- s.drain() }()
+ select {
+ case out := <-done:
+ require.ErrorContains(t, out, "panic", "a panicking Close must
surface as an error")
+ case <-time.After(2 * time.Second):
+ t.Fatal("drain hung: the confirm goroutine did not deliver an
error after Close panicked")
+ }
+}
+
+// TestConfirmPipeline_DrainKeepsFirstFailure pins that drain returns the
EARLIEST
+// failing error, not a later one, when several in-flight batches fail.
+func TestConfirmPipeline_DrainKeepsFirstFailure(t *testing.T) {
+ p := newConfirmPipeline()
+ ch1 := make(chan error, 1)
+ ch2 := make(chan error, 1)
+ require.NoError(t, p.add(ch1))
+ require.NoError(t, p.add(ch2))
+
+ ch1 <- fmt.Errorf("first failure")
+ ch2 <- fmt.Errorf("second failure")
+ require.ErrorContains(t, p.drain(), "first failure", "drain must keep
the earliest failure")
+}
+
+// TestBatchSender_SurfacesFailureViaInflightBound pins the realistic
mid-stream
+// abort path: a batch's failure is surfaced through the in-flight bound's
+// add-await (i.e. out of enqueue), not only at the final drain.
+func TestBatchSender_SurfacesFailureViaInflightBound(t *testing.T) {
+ failing := &countingBatchPublisher{closeCee:
map[string]*common.Error{"node-1": common.NewError("boom")}}
+ s, _ := senderReturning(t, failing) // batch 1 fails; later batches are
clean
+
+ emit := cleanEmit(s)
+ var surfaced error
+ for i := 0; i < 3*measureReplayBatchSize; i++ {
+ if out := emit(); out != nil {
+ surfaced = out
+ }
+ }
+ require.Contains(t, nodeErrCee(t, surfaced), "node-1",
+ "batch 1's failure must surface via the in-flight bound during
enqueue, not only at drain")
+ s.drain()
+}
+
+// TestReplay_RowCountBoundaries pins the driver's batching across small and
large
+// parts: for every part size the reported row count is exact, a successful
part
+// (even an empty one) counts once, the number of published batches matches the
+// 2000-row boundary, and a subsequent close never publishes.
+func TestReplay_RowCountBoundaries(t *testing.T) {
+ cases := []struct {
+ name string
+ rows int
+ wantBatches int
+ }{
+ {"empty", 0, 0},
+ {"single_row", 1, 1},
+ {"just_under_one_batch", measureReplayBatchSize - 1, 1},
+ {"exactly_one_batch", measureReplayBatchSize, 1},
+ {"one_over_one_batch", measureReplayBatchSize + 1, 2},
+ {"exactly_two_batches", 2 * measureReplayBatchSize, 2},
+ {"two_batches_and_a_tail", 2*measureReplayBatchSize + 5, 3},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ s, vended := senderReturning(t)
+ var counter uint64
+ rows, err := s.replay(context.TODO(),
logger.GetLogger("test-replayer"), "g", "p", &counter, &fakeCursor{n: tc.rows},
cleanEmit(s))
+ require.NoError(t, err)
+ require.Equal(t, tc.rows, rows, "rowCount must equal
the source row count")
+ require.Equal(t, uint64(1), counter, "every successful
part counts once, even when empty")
+ // Sum over EVERY vended publisher so an extra batch on
a rotated-beyond
+ // publisher is still counted, and pin the exact
rotation count.
+ require.Equal(t, int32(tc.wantBatches),
totalPublished(vended()), "published batch count")
+ require.Len(t, vended(), tc.wantBatches+1, "exactly one
publisher per batch plus the staged one")
+ // Every source row must reach Publish exactly once: no
row dropped at a
+ // batch boundary and none lost in the trailing tail.
+ require.Equal(t, int32(tc.rows),
totalPublishedRows(vended()), "every source row must be published exactly once")
+
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+ require.Equal(t, int32(tc.wantBatches),
totalPublished(vended()), "close must not publish after a successful part")
+ })
+ }
+}
+
+// TestReplay_BatchPlusTailSendsEveryRow pins the "a few rows past a flush
+// boundary" case: with two full batches plus a short tail, each full batch
+// carries exactly batchSize messages, the tail batch carries the remainder,
the
+// staged publisher carries nothing, and the grand total equals the row count —
+// so no row is dropped at a boundary nor lost in the trailing tail.
+func TestReplay_BatchPlusTailSendsEveryRow(t *testing.T) {
+ const tail = 5
+ const rows = 2*measureReplayBatchSize + tail
+ s, vended := senderReturning(t)
+ var counter uint64
+ got, err := s.replay(context.TODO(), logger.GetLogger("test-replayer"),
"g", "p", &counter, &fakeCursor{n: rows}, cleanEmit(s))
+ require.NoError(t, err)
+ require.Equal(t, rows, got, "rowCount must equal the source row count")
+
+ pubs := vended()
+ require.Len(t, pubs, 4, "two full batches + the tail batch + the staged
publisher")
+ require.Equal(t, int32(measureReplayBatchSize),
pubs[0].publishedRows.Load(), "first full batch carries batchSize rows")
+ require.Equal(t, int32(measureReplayBatchSize),
pubs[1].publishedRows.Load(), "second full batch carries batchSize rows")
+ require.Equal(t, int32(tail), pubs[2].publishedRows.Load(), "the
trailing batch carries exactly the tail")
+ require.Equal(t, int32(0), pubs[3].publishedRows.Load(), "the staged
publisher holds nothing")
+ require.Equal(t, int32(rows), totalPublishedRows(pubs), "every row sent
exactly once")
+
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+}
+
+// TestReplay_AbortDrainsInflightAndDiscards pins the abort path across small
and
+// large parts and at the batch boundary: whatever full batches were already
sent
+// are still confirmed (drained), the un-flushed residual is discarded (never
+// resurrected by a later close), the failed part is not counted, and no row
of an
+// aborted part is published beyond the batches that left before the failure.
+func TestReplay_AbortDrainsInflightAndDiscards(t *testing.T) {
+ cases := []struct {
+ name string
+ failAt int // the 1-based emit call that returns a
build error
+ wantRows int // rows counted before the failure
+ wantSentBatches int // full batches flushed before the failure
+ }{
+ {"first_row_nothing_buffered", 1, 0, 0},
+ {"small_residual_no_batch", 2, 1, 0},
+ {"empty_residual_at_batch_boundary", measureReplayBatchSize +
1, measureReplayBatchSize, 1},
+ {"residual_after_two_batches", 2*measureReplayBatchSize + 501,
2*measureReplayBatchSize + 500, 2},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ s, vended := senderReturning(t)
+ var counter uint64
+ rows, err := s.replay(context.TODO(),
logger.GetLogger("test-replayer"), "g", "p", &counter, &fakeCursor{n:
tc.failAt}, failingEmit(s, tc.failAt))
+ require.Error(t, err, "the part must report failure")
+ require.Equal(t, tc.wantRows, rows, "rowCount counts
the rows enqueued before the failure")
+ require.Zero(t, counter, "a failed part must not be
counted")
+
+ require.Len(t, vended(), tc.wantSentBatches+1, "one
publisher per sent batch plus the staged one")
+ for i := 0; i < tc.wantSentBatches; i++ {
+ require.Equalf(t, int32(1),
vended()[i].closeCount.Load(), "sent batch %d must be confirmed (drained)", i)
+ }
+ require.Equal(t, int32(tc.wantSentBatches),
totalPublished(vended()), "only the batches sent before the failure are
published")
+ require.Nil(t, s.take(), "the residual must be
discarded")
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+ })
+ }
+}
+
+// TestReplay_IteratorErrorDrainsAndDiscards covers the distinct abort branch
where
+// the source iterator fails after yielding rows (cur.Err() != nil). The
already-
+// sent batches must still be drained, the residual discarded, and the part
failed
+// — so a truncated part is never committed as durable.
+func TestReplay_IteratorErrorDrainsAndDiscards(t *testing.T) {
+ s, vended := senderReturning(t)
+ const n = 2*measureReplayBatchSize + 300 // two full batches sent, 300
buffered, then the iterator errors
+ var counter uint64
+ rows, err := s.replay(context.TODO(),
logger.GetLogger("test-replayer"), "g", "p", &counter,
+ &fakeCursor{n: n, err: fmt.Errorf("iterator boom")},
cleanEmit(s))
+ require.ErrorContains(t, err, "iterator boom", "an iterator error must
fail the part")
+ require.Equal(t, n, rows, "all rows read before the iterator error are
counted")
+ require.Zero(t, counter, "a failed part must not be counted")
+
+ require.Len(t, vended(), 3, "two sent batches plus the staged one")
+ require.Equal(t, int32(1), vended()[0].closeCount.Load(), "sent batch 0
must be drained")
+ require.Equal(t, int32(1), vended()[1].closeCount.Load(), "sent batch 1
must be drained")
+ require.Equal(t, int32(0), vended()[2].publishCount.Load(), "the
residual must not be published")
+ require.Nil(t, s.take(), "the residual must be discarded")
+}
+
+// TestReplay_NodeErrorViaInflightBoundDrivesAbort covers the composition of a
+// per-node confirm failure surfacing through the in-flight bound's add-await
+// while driven by replay (not raw enqueue): the part aborts mid-scan,
in-flight
+// batches drain, and the residual is discarded.
+func TestReplay_NodeErrorViaInflightBoundDrivesAbort(t *testing.T) {
+ failing := &countingBatchPublisher{closeCee:
map[string]*common.Error{"node-1": common.NewError("boom")}}
+ s, _ := senderReturning(t, failing) // batch 1's confirm fails
+ var counter uint64
+ rows, err := s.replay(context.TODO(),
logger.GetLogger("test-replayer"), "g", "p", &counter,
+ &fakeCursor{n: 3 * measureReplayBatchSize}, cleanEmit(s))
+ require.Contains(t, nodeErrCee(t, err), "node-1", "the node error must
abort the part")
+ require.Zero(t, counter, "a failed part must not be counted")
+ // Batch 1's failure surfaces only once a later batch's flush awaits it
at the
+ // in-flight bound, so the part aborts mid-scan: after more than one
batch's
+ // worth of rows but before consuming all of them.
+ require.Greater(t, rows, measureReplayBatchSize, "aborted after more
than one batch")
+ require.Less(t, rows, 3*measureReplayBatchSize, "aborted mid-scan, not
after consuming every row")
+ require.Nil(t, s.take(), "the residual must be discarded")
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+}
+
+// TestRecordReplayNodeErrors_ReportsPartialFailure pins the partial-failure
+// report: the per-node delivery errors a failed batch carries are recorded
into
+// the progress report keyed by node (even through fmt wrapping), while a
+// global-only failure and a plain non-nodeReplayError add no node entries — so
+// the resume report reflects exactly which nodes rejected rows and nothing
else.
+func TestRecordReplayNodeErrors_ReportsPartialFailure(t *testing.T) {
+ require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level:
"warn"}))
+ l := logger.GetLogger("test")
+
+ t.Run("per_node_errors_recorded", func(t *testing.T) {
+ p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+ err := newNodeReplayError(map[string]*common.Error{
+ "node-2": common.NewError("flush timeout"),
+ "node-5": common.NewError("shard rejected"),
+ }, nil)
+ recordReplayNodeErrors(p, "g", err)
+ require.Contains(t, p.RowReplayNodeErrors, "g")
+ require.Len(t, p.RowReplayNodeErrors["g"], 2, "exactly the
failing nodes are reported")
+ require.Contains(t, p.RowReplayNodeErrors["g"]["node-2"],
"flush timeout")
+ require.Contains(t, p.RowReplayNodeErrors["g"]["node-5"],
"shard rejected")
+ require.NotContains(t, p.RowReplayNodeErrors["g"], "node-1", "a
node that did not fail must not be reported")
+ })
+
+ t.Run("wrapped_error_still_unwrapped", func(t *testing.T) {
+ p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+ inner := newNodeReplayError(map[string]*common.Error{"node-7":
common.NewError("boom")}, nil)
+ recordReplayNodeErrors(p, "g", fmt.Errorf("row-replay measure
part p: %w", inner))
+ require.Contains(t, p.RowReplayNodeErrors["g"], "node-7",
"errors.As must find node errors through fmt wrapping")
+ })
+
+ t.Run("global_only_error_records_nothing", func(t *testing.T) {
+ p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+ recordReplayNodeErrors(p, "g", newNodeReplayError(nil,
fmt.Errorf("stream broke")))
+ require.NotContains(t, p.RowReplayNodeErrors, "g", "a
global-only failure must not add node entries")
+ })
+
+ t.Run("plain_error_records_nothing", func(t *testing.T) {
+ p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+ recordReplayNodeErrors(p, "g", fmt.Errorf("open part failed"))
+ require.NotContains(t, p.RowReplayNodeErrors, "g", "a
non-nodeReplayError must not be reported as node failures")
+ })
+}
+
+// TestBatchSender_EachBatchGetsItsOwnTimeoutWindow pins the fix for the
original
+// migration timeout bug: every batch is published on its OWN publisher opened
+// with a fresh copy of the configured timeout, so the window is per-batch. A
slow
+// gap between two sends can never eat into the next batch's window the way a
+// single long-lived stream's lifetime once did. A 5s-timeout sender sends two
+// full batches; we assert a fresh NewBatchPublisher(5s) backs each batch
(initial
+// publisher + one rotation per batch), every call uses the full 5s (not a
shrunk
+// or shared window), and the two batches ride distinct publishers.
+func TestBatchSender_EachBatchGetsItsOwnTimeoutWindow(t *testing.T) {
+ const (
+ timeout = 5 * time.Second
+ batchSize = 2
+ )
+ ctrl := gomock.NewController(t)
+ t.Cleanup(ctrl.Finish)
+ mockClient := queue.NewMockClient(ctrl)
+ var timeouts []time.Duration
+ var pubs []*countingBatchPublisher
+ mockClient.EXPECT().NewBatchPublisher(gomock.Any()).
+ DoAndReturn(func(d time.Duration) queue.BatchPublisher {
+ timeouts = append(timeouts, d)
+ p := &countingBatchPublisher{}
+ pubs = append(pubs, p)
+ return p
+ }).AnyTimes()
+
+ s := newBatchSender(mockClient, data.TopicMeasureWrite, batchSize,
timeout)
+ for i := 0; i < 2*batchSize; i++ { // two full batches
+ require.NoError(t, s.enqueue(context.TODO(),
+ bus.NewBatchMessageWithNode(bus.MessageID(1), "node-1",
&measurev1.InternalWriteRequest{})))
+ }
+ require.NoError(t, s.drain())
+
+ // Initial publisher + one rotation per sent batch = 3
NewBatchPublisher calls,
+ // and EVERY one must request the full configured timeout (a fresh 5s
window),
+ // proving the two sends do not share one stream's single 5s lifetime.
+ require.Len(t, timeouts, 3, "each batch must open its own publisher,
not share one")
+ for i, d := range timeouts {
+ require.Equalf(t, timeout, d, "publisher %d must open with the
full configured timeout, not a shared/shrunk one", i)
+ }
+ require.Len(t, pubs, 3)
+ require.Equal(t, int32(1), pubs[0].publishCount.Load(), "batch 1 rides
its own publisher")
+ require.Equal(t, int32(1), pubs[1].publishCount.Load(), "batch 2 rides
a different publisher")
+ require.Equal(t, int32(0), pubs[2].publishCount.Load(), "the staged
publisher holds nothing")
+ require.NotSame(t, pubs[0], pubs[1], "the two batches must not share a
publisher (nor its timeout window)")
+
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+}
+
+// TestNodeReplayError_ErrorAndUnwrap pins the error type directly: its two
+// message forms (global cause vs per-node count) and that Unwrap exposes the
+// global cause so errors.Is/As chains reach it even through fmt wrapping.
+func TestNodeReplayError_ErrorAndUnwrap(t *testing.T) {
+ // Global-error form: Error() is the wrapped cause, and Unwrap exposes
it.
+ cause := fmt.Errorf("publish failed")
+ globalErr := newNodeReplayError(nil, cause)
+ require.EqualError(t, globalErr, "publish failed")
+ require.ErrorIs(t, globalErr, cause, "Unwrap must expose the global
cause to errors.Is")
+ require.ErrorIs(t, fmt.Errorf("row-replay measure part p: %w",
globalErr), cause,
+ "the global cause must stay reachable through fmt wrapping")
+
+ // Node-error form (cee only, no global err): Error() reports the node
count
+ // and there is no global cause to unwrap.
+ nodeErr := newNodeReplayError(map[string]*common.Error{
+ "node-1": common.NewError("boom"),
+ "node-2": common.NewError("boom"),
+ }, nil)
+ require.EqualError(t, nodeErr, "2 node error(s)")
+ require.NotErrorIs(t, nodeErr, cause, "a node-only error has no global
cause to unwrap")
+}
+
+// TestBatchSender_CloseSurfacesDrainedFailure covers close()'s failure branch:
+// when an in-flight batch failed, close() drains it and returns that batch's
+// per-node errors instead of the idle publisher's clean result.
+func TestBatchSender_CloseSurfacesDrainedFailure(t *testing.T) {
+ failing := &countingBatchPublisher{closeCee:
map[string]*common.Error{"node-9": common.NewError("boom")}}
+ s, _ := senderReturning(t, failing) // the first publisher carries
the batch and fails
+ enqueueN(t, s, measureReplayBatchSize) // one full batch sent, left
in-flight (undrained)
+ cee, err := s.close() // close() must drain it and
surface the node error
+ require.NoError(t, err, "a node-only failure carries no global error")
+ require.Contains(t, cee, "node-9", "close must return the drained
batch's per-node errors")
+}
+
+// TestReplay_SurfacesSynchronousPublishErrorPromptly pins that a synchronous
+// Publish failure aborts the part on the flush that triggered it — not
deferred
+// until the in-flight bound or the final drain. With the failing publisher
+// carrying the first batch, replay must stop right after that batch instead of
+// scanning ~depth more batches: the old code returned the error only via the
+// bound, so it would have consumed ~3 batches before aborting.
+func TestReplay_SurfacesSynchronousPublishErrorPromptly(t *testing.T) {
+ pub := &countingBatchPublisher{publishErr: fmt.Errorf("stream send
broke")}
+ s, _ := senderReturning(t, pub) // the first batch's Publish fails
synchronously
+ var counter uint64
+ rows, err := s.replay(context.TODO(),
logger.GetLogger("test-replayer"), "g", "p", &counter,
+ &fakeCursor{n: 3 * measureReplayBatchSize}, cleanEmit(s))
+ require.ErrorContains(t, err, "stream send broke", "a synchronous
Publish error must abort the part")
+ require.Zero(t, counter, "a failed part must not be counted")
+ // Row 2000's emit triggers the failing flush and returns its error, so
the loop
+ // breaks before counting it: exactly one batch worth minus the failing
row, far
+ // short of the 3 batches the deferred (bound-only) behavior would have
scanned.
+ require.Equal(t, measureReplayBatchSize-1, rows, "replay must abort on
the failing flush, not keep scanning")
+ require.Nil(t, s.take(), "the residual must be discarded")
+ _, cerr := s.close()
+ require.NoError(t, cerr)
+}
diff --git a/banyand/backup/lifecycle/row_replay_trace.go
b/banyand/backup/lifecycle/row_replay_trace.go
index 4d4f85704..937eb012d 100644
--- a/banyand/backup/lifecycle/row_replay_trace.go
+++ b/banyand/backup/lifecycle/row_replay_trace.go
@@ -22,8 +22,6 @@ import (
"fmt"
"path/filepath"
"strconv"
- "sync"
- "sync/atomic"
"time"
"github.com/apache/skywalking-banyandb/api/common"
@@ -53,16 +51,13 @@ const (
// returns an error.
type traceRowReplayer struct {
selector node.Selector
- client queue.Client
- publisher queue.BatchPublisher
+ sender *batchSender
fs fs.FileSystem
logger *logger.Logger
schema *databasev1.Trace
traceName string
counter *uint64
group string
- batch []bus.Message
- batchMu sync.Mutex
targetShardNum uint32
}
@@ -88,42 +83,18 @@ func newTraceRowReplayer(
group: group,
targetShardNum: targetShardNum,
selector: selector,
- client: client,
- publisher:
client.NewBatchPublisher(traceReplayBatchTimeout),
+ sender: newBatchSender(client, data.TopicTraceWrite,
traceReplayBatchSize, traceReplayBatchTimeout),
fs: fileSystem,
logger: l,
schema: t,
traceName: t.Metadata.Name,
counter: counter,
- batch: make([]bus.Message, 0, traceReplayBatchSize),
}, nil
}
-// Close flushes pending messages and closes the publisher.
+// Close drains any outstanding batch confirmations and closes the publisher.
func (r *traceRowReplayer) Close() (map[string]*common.Error, error) {
- flushCtx, cancel := context.WithTimeout(context.Background(),
traceReplayBatchTimeout)
- defer cancel()
- flushErr := r.flushBatch(flushCtx)
- cee, closeErr := r.publisher.Close()
- if flushErr != nil {
- return cee, flushErr
- }
- return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain
the
-// per-node delivery result for everything sent since the last call, then
opens a
-// fresh publisher for subsequent shards. The batch publisher is
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed shard be confirmed durable before it is marked completed.
-func (r *traceRowReplayer) flushAndConfirm(ctx context.Context)
(map[string]*common.Error, error) {
- flushErr := r.flushBatch(ctx)
- cee, closeErr := r.publisher.Close()
- r.publisher = r.client.NewBatchPublisher(traceReplayBatchTimeout)
- if flushErr != nil {
- return cee, flushErr
- }
- return cee, closeErr
+ return r.sender.close()
}
func (r *traceRowReplayer) replayPart(ctx context.Context, partPath string)
(int, error) {
@@ -141,40 +112,10 @@ func (r *traceRowReplayer) replayPart(ctx
context.Context, partPath string) (int
it := reader.Iterator()
defer it.Close()
-
- rowCount := 0
- for it.Next() {
- row := it.Row()
- if rowErr := r.publishRow(ctx, row); rowErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(rowErr).
- Str("group", r.group).
- Str("trace", r.traceName).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("trace row-replay aborted mid-part on
publish error; will retry on resume")
- return rowCount, rowErr
- }
- rowCount++
- }
- if iterErr := it.Err(); iterErr != nil {
- pos := it.Position()
- r.logger.Warn().Err(iterErr).
- Str("group", r.group).
- Str("trace", r.traceName).
- Str("part", partPath).
- Int("block_idx", pos.BlockIdx).
- Int("row_idx", pos.RowIdx).
- Int("rows_published", rowCount).
- Msg("trace row-replay aborted mid-part; will retry on
resume")
- return rowCount, iterErr
- }
- if r.counter != nil {
- atomic.AddUint64(r.counter, 1)
- }
- return rowCount, nil
+ // Prefix the part label with the trace name so the shared driver's
timing and
+ // abort logs keep the trace identity the old per-type logs carried.
+ return r.sender.replay(ctx, r.logger, r.group,
r.traceName+"/"+partPath, r.counter, it,
+ func() error { return r.publishRow(ctx, it.Row()) })
}
// buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -204,30 +145,5 @@ func (r *traceRowReplayer) publishRow(ctx context.Context,
row dumptrace.Row) er
return fmt.Errorf("pick target node for trace %s: %w",
r.traceName, err)
}
msg :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
- return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *traceRowReplayer) enqueue(ctx context.Context, msg bus.Message) error
{
- r.batchMu.Lock()
- r.batch = append(r.batch, msg)
- shouldFlush := len(r.batch) >= traceReplayBatchSize
- r.batchMu.Unlock()
- if shouldFlush {
- return r.flushBatch(ctx)
- }
- return nil
-}
-
-func (r *traceRowReplayer) flushBatch(ctx context.Context) error {
- r.batchMu.Lock()
- if len(r.batch) == 0 {
- r.batchMu.Unlock()
- return nil
- }
- pending := r.batch
- r.batch = make([]bus.Message, 0, traceReplayBatchSize)
- r.batchMu.Unlock()
- _, err := r.publisher.Publish(ctx, data.TopicTraceWrite, pending...)
- return err
+ return r.sender.enqueue(ctx, msg)
}
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go
b/banyand/backup/lifecycle/stream_migration_visitor.go
index a229ae076..c0aa6f69c 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -382,30 +382,18 @@ func (mv *streamMigrationVisitor) visitPartRowReplay(ctx
context.Context, segmen
Int("target_segments_count", len(targetSegments)).
Str("group", mv.group).
Msg("stream part spans multiple target segments; switching to
row-replay")
+ // replayPart sends and confirms the part's rows through the bounded
pipeline,
+ // draining all in-flight confirmations before returning. A non-nil
error means
+ // some rows were not durably delivered; marking the part errored
(rather than
+ // completed) ensures the resume guard re-replays the whole part.
rowCount, err := replayer.replayPart(ctx, partPath)
if err != nil {
// Row-replay is all-or-nothing per part; mark the source part
errored so
// resume retries the whole part (same source key the guard
checks above).
+ recordReplayNodeErrors(mv.progress, mv.group, err)
mv.progress.MarkStreamPartError(mv.group, sourceSegmentIDStr,
sourceShardID, partID, err.Error())
return fmt.Errorf("row-replay stream part %s: %w", partPath,
err)
}
- // Confirm this part's rows reached every node before marking it
completed.
- // replayPart only enqueues; the batch publisher is client-streaming so
- // per-node errors surface only when its stream closes, so
flushAndConfirm
- // closes the publisher to collect that result (then opens a fresh one
for the
- // next part). Marking before this confirmation could report success
for rows
- // a flush failure never delivered, and the resume guard would then
skip the
- // part, losing data.
- cee, flushErr := replayer.flushAndConfirm(ctx)
- if flushErr != nil || len(cee) > 0 {
- mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
- confirmErr := flushErr
- if confirmErr == nil {
- confirmErr = fmt.Errorf("%d node error(s)", len(cee))
- }
- mv.progress.MarkStreamPartError(mv.group, sourceSegmentIDStr,
sourceShardID, partID, confirmErr.Error())
- return fmt.Errorf("confirm row-replay stream part %s: %w",
partPath, confirmErr)
- }
mv.progress.MarkStreamPartCompleted(mv.group, sourceSegmentIDStr,
sourceShardID, partID)
mv.progress.MarkSourceStreamPartCompleted(mv.group, partPath,
sourceShardID, partID)
mv.progress.AddStreamRowReplay(mv.group, rowCount)
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go
b/banyand/backup/lifecycle/trace_migration_visitor.go
index b033b4d42..933ffe4e1 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -385,32 +385,20 @@ func (mv *traceMigrationVisitor) visitShardRowReplay(ctx
context.Context, source
continue
}
partPath := filepath.Join(shardPath, name)
- rowCount, replayErr := replayer.replayPart(ctx, partPath)
- if replayErr != nil {
- mv.progress.MarkTraceShardError(mv.group, segmentIDStr,
sourceShardID, replayErr.Error())
- return fmt.Errorf("row-replay trace part %s: %w",
partPath, replayErr)
+ // replayPart sends and confirms the part's rows through the
bounded pipeline,
+ // draining all in-flight confirmations before returning. A
non-nil error
+ // marks the whole shard errored so resume re-replays it, never
skipping a
+ // shard whose rows were not durably delivered.
+ rowCount, partErr := replayer.replayPart(ctx, partPath)
+ if partErr != nil {
+ recordReplayNodeErrors(mv.progress, mv.group, partErr)
+ mv.progress.MarkTraceShardError(mv.group, segmentIDStr,
sourceShardID, partErr.Error())
+ return fmt.Errorf("row-replay trace part %s: %w",
partPath, partErr)
}
totalRows += rowCount
partsReplayed++
}
- // Confirm this shard's rows reached every node before marking it
completed.
- // replayPart only enqueues; the batch publisher is client-streaming so
- // per-node errors surface only when its stream closes, so
flushAndConfirm
- // closes the publisher to collect that result (then opens a fresh one
for the
- // next shard). Marking before this confirmation could report success
for rows
- // a flush failure never delivered, and the resume guard would then
skip the
- // whole shard, losing data.
- cee, flushErr := replayer.flushAndConfirm(ctx)
- if flushErr != nil || len(cee) > 0 {
- mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
- confirmErr := flushErr
- if confirmErr == nil {
- confirmErr = fmt.Errorf("%d node error(s)", len(cee))
- }
- mv.progress.MarkTraceShardError(mv.group, segmentIDStr,
sourceShardID, confirmErr.Error())
- return fmt.Errorf("confirm row-replay trace shard %s: %w",
shardPath, confirmErr)
- }
mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr,
sourceShardID)
mv.progress.MarkSourceTraceShardCompleted(mv.group, segmentIDStr,
sourceShardID)
mv.progress.AddTraceRowReplay(mv.group, partsReplayed, totalRows)