This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b2be017c8 Fix migration failure when row replay from large data (#1154)
b2be017c8 is described below

commit b2be017c8373477a840d46a68fd3ef7b6728e601
Author: mrproliu <[email protected]>
AuthorDate: Thu Jun 4 16:50:42 2026 +0800

    Fix migration failure when row replay from large data (#1154)
---
 .../backup/lifecycle/measure_migration_visitor.go  |  22 +-
 banyand/backup/lifecycle/row_replay_measure.go     | 106 +---
 banyand/backup/lifecycle/row_replay_pipeline.go    | 351 +++++++++++++
 banyand/backup/lifecycle/row_replay_stream.go      |  98 +---
 banyand/backup/lifecycle/row_replay_test.go        | 550 +++++++++++++++++++++
 banyand/backup/lifecycle/row_replay_trace.go       | 102 +---
 .../backup/lifecycle/stream_migration_visitor.go   |  22 +-
 .../backup/lifecycle/trace_migration_visitor.go    |  30 +-
 8 files changed, 951 insertions(+), 330 deletions(-)

diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go 
b/banyand/backup/lifecycle/measure_migration_visitor.go
index 128c77c09..b462ee33c 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -392,30 +392,18 @@ func (mv *measureMigrationVisitor) visitPartRowReplay(ctx 
context.Context, segme
                Int("target_segments_count", len(targetSegments)).
                Str("group", mv.group).
                Msg("measure part spans multiple target segments; switching to 
row-replay")
+       // replayPart sends and confirms the part's rows through the bounded 
pipeline,
+       // draining all in-flight confirmations before returning. A non-nil 
error means
+       // some rows were not durably delivered; marking the part errored 
(rather than
+       // completed) ensures the resume guard re-replays the whole part.
        rowCount, err := replayer.replayPart(ctx, partPath)
        if err != nil {
                // Row-replay is all-or-nothing per part; mark the source part 
errored so
                // resume retries the whole part (same source key the guard 
checks above).
+               recordReplayNodeErrors(mv.progress, mv.group, err)
                mv.progress.MarkMeasurePartError(mv.group, sourceSegmentIDStr, 
sourceShardID, partID, err.Error())
                return fmt.Errorf("row-replay measure part %s: %w", partPath, 
err)
        }
-       // Confirm this part's rows reached every node before marking it 
completed.
-       // replayPart only enqueues; the batch publisher is client-streaming so
-       // per-node errors surface only when its stream closes, so 
flushAndConfirm
-       // closes the publisher to collect that result (then opens a fresh one 
for the
-       // next part). Marking before this confirmation could report success 
for rows
-       // a flush failure never delivered, and the resume guard would then 
skip the
-       // part, losing data.
-       cee, flushErr := replayer.flushAndConfirm(ctx)
-       if flushErr != nil || len(cee) > 0 {
-               mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
-               confirmErr := flushErr
-               if confirmErr == nil {
-                       confirmErr = fmt.Errorf("%d node error(s)", len(cee))
-               }
-               mv.progress.MarkMeasurePartError(mv.group, sourceSegmentIDStr, 
sourceShardID, partID, confirmErr.Error())
-               return fmt.Errorf("confirm row-replay measure part %s: %w", 
partPath, confirmErr)
-       }
        mv.progress.MarkMeasurePartCompleted(mv.group, sourceSegmentIDStr, 
sourceShardID, partID)
        mv.progress.MarkSourceMeasurePartCompleted(mv.group, partPath, 
sourceShardID, partID)
        mv.progress.AddMeasureRowReplay(mv.group, rowCount)
diff --git a/banyand/backup/lifecycle/row_replay_measure.go 
b/banyand/backup/lifecycle/row_replay_measure.go
index 80587334c..5966b7d65 100644
--- a/banyand/backup/lifecycle/row_replay_measure.go
+++ b/banyand/backup/lifecycle/row_replay_measure.go
@@ -23,7 +23,6 @@ import (
        "path/filepath"
        "strconv"
        "sync"
-       "sync/atomic"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -64,8 +63,7 @@ type cachedMeasureSchema struct {
 // real Write API, resolving each row's schema on demand via EntityValues.
 type measureRowReplayer struct {
        selector        node.Selector
-       client          queue.Client
-       publisher       queue.BatchPublisher
+       sender          *batchSender
        fs              fs.FileSystem
        logger          *logger.Logger
        measureSchemas  map[string]*databasev1.Measure
@@ -75,10 +73,8 @@ type measureRowReplayer struct {
        counter         *uint64
        group           string
        irPath          string
-       batch           []bus.Message
        schemaCacheMu   sync.Mutex
        irMu            sync.Mutex
-       batchMu         sync.Mutex
        targetShardNum  uint32
 }
 
@@ -115,24 +111,20 @@ func newMeasureRowReplayer(
                group:           group,
                targetShardNum:  targetShardNum,
                selector:        selector,
-               client:          client,
-               publisher:       
client.NewBatchPublisher(measureReplayBatchTimeout),
+               sender:          newBatchSender(client, data.TopicMeasureWrite, 
measureReplayBatchSize, measureReplayBatchTimeout),
                fs:              fileSystem,
                logger:          l,
                measureSchemas:  measureSchemas,
                schemaCache:     make(map[string]*cachedMeasureSchema),
                mergedRuleToTag: deriveMergedRuleToTag(measures, rules, 
bindings),
                counter:         counter,
-               batch:           make([]bus.Message, 0, measureReplayBatchSize),
        }, nil
 }
 
-// Close flushes pending messages, closes the publisher and releases cached
-// IndexResolver handles.
+// Close drains any outstanding batch confirmations, closes the publisher and
+// releases cached IndexResolver handles.
 func (r *measureRowReplayer) Close() (map[string]*common.Error, error) {
-       flushCtx, cancel := context.WithTimeout(context.Background(), 
measureReplayBatchTimeout)
-       defer cancel()
-       flushErr := r.flushBatch(flushCtx)
+       cee, closeErr := r.sender.close()
        r.irMu.Lock()
        if r.irResolver != nil {
                _ = r.irResolver.Close()
@@ -140,25 +132,6 @@ func (r *measureRowReplayer) Close() 
(map[string]*common.Error, error) {
                r.irPath = ""
        }
        r.irMu.Unlock()
-       cee, closeErr := r.publisher.Close()
-       if flushErr != nil {
-               return cee, flushErr
-       }
-       return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain 
the
-// per-node delivery result for everything sent since the last call, then 
opens a
-// fresh publisher for subsequent parts. The batch publisher is 
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed part be confirmed durable before it is marked completed.
-func (r *measureRowReplayer) flushAndConfirm(ctx context.Context) 
(map[string]*common.Error, error) {
-       flushErr := r.flushBatch(ctx)
-       cee, closeErr := r.publisher.Close()
-       r.publisher = r.client.NewBatchPublisher(measureReplayBatchTimeout)
-       if flushErr != nil {
-               return cee, flushErr
-       }
        return cee, closeErr
 }
 
@@ -211,7 +184,9 @@ func (r *measureRowReplayer) loadIndexResolver(segmentPath 
string) (*dump.IndexR
        return ir, nil
 }
 
-// replayPart opens a source part and publishes each row through the queue.
+// replayPart opens a source part and replays its rows through the sender, 
which
+// sends and confirms batches through the bounded pipeline. The returned error 
is
+// non-nil when any row build/route, iteration, or batch confirmation failed.
 func (r *measureRowReplayer) replayPart(ctx context.Context, partPath string) 
(int, error) {
        partID, parseErr := strconv.ParseUint(filepath.Base(partPath), 16, 64)
        if parseErr != nil {
@@ -234,38 +209,8 @@ func (r *measureRowReplayer) replayPart(ctx 
context.Context, partPath string) (i
 
        it := reader.Iterator()
        defer it.Close()
-
-       rowCount := 0
-       for it.Next() {
-               row := it.Row()
-               if rowErr := r.publishRow(ctx, ir, row); rowErr != nil {
-                       pos := it.Position()
-                       r.logger.Warn().Err(rowErr).
-                               Str("group", r.group).
-                               Str("part", partPath).
-                               Int("block_idx", pos.BlockIdx).
-                               Int("row_idx", pos.RowIdx).
-                               Int("rows_published", rowCount).
-                               Msg("measure row-replay aborted mid-part on 
publish error; will retry on resume")
-                       return rowCount, rowErr
-               }
-               rowCount++
-       }
-       if iterErr := it.Err(); iterErr != nil {
-               pos := it.Position()
-               r.logger.Warn().Err(iterErr).
-                       Str("group", r.group).
-                       Str("part", partPath).
-                       Int("block_idx", pos.BlockIdx).
-                       Int("row_idx", pos.RowIdx).
-                       Int("rows_published", rowCount).
-                       Msg("measure row-replay aborted mid-part; will retry on 
resume")
-               return rowCount, iterErr
-       }
-       if r.counter != nil {
-               atomic.AddUint64(r.counter, 1)
-       }
-       return rowCount, nil
+       return r.sender.replay(ctx, r.logger, r.group, partPath, r.counter, it,
+               func() error { return r.publishRow(ctx, ir, it.Row()) })
 }
 
 // buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -308,7 +253,9 @@ func (r *measureRowReplayer) buildWriteRequest(
        return wr, iwr, nil
 }
 
-// publishRow rebuilds a WriteRequest from a measure Row and enqueues it.
+// publishRow rebuilds a WriteRequest from a measure Row and hands it to the
+// sender. The returned error is non-nil only on a build/route error or when a
+// full batch's asynchronous confirmation surfaced an earlier per-node failure.
 func (r *measureRowReplayer) publishRow(ctx context.Context, ir 
*dump.IndexResolver, row dumpmeasure.Row) error {
        wr, iwr, err := r.buildWriteRequest(ir, row)
        if err != nil {
@@ -319,30 +266,5 @@ func (r *measureRowReplayer) publishRow(ctx 
context.Context, ir *dump.IndexResol
                return fmt.Errorf("pick target node for %s: %w", 
wr.Metadata.Name, err)
        }
        msg := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
-       return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *measureRowReplayer) enqueue(ctx context.Context, msg bus.Message) 
error {
-       r.batchMu.Lock()
-       r.batch = append(r.batch, msg)
-       shouldFlush := len(r.batch) >= measureReplayBatchSize
-       r.batchMu.Unlock()
-       if shouldFlush {
-               return r.flushBatch(ctx)
-       }
-       return nil
-}
-
-func (r *measureRowReplayer) flushBatch(ctx context.Context) error {
-       r.batchMu.Lock()
-       if len(r.batch) == 0 {
-               r.batchMu.Unlock()
-               return nil
-       }
-       pending := r.batch
-       r.batch = make([]bus.Message, 0, measureReplayBatchSize)
-       r.batchMu.Unlock()
-       _, err := r.publisher.Publish(ctx, data.TopicMeasureWrite, pending...)
-       return err
+       return r.sender.enqueue(ctx, msg)
 }
diff --git a/banyand/backup/lifecycle/row_replay_pipeline.go 
b/banyand/backup/lifecycle/row_replay_pipeline.go
new file mode 100644
index 000000000..de0a21dd6
--- /dev/null
+++ b/banyand/backup/lifecycle/row_replay_pipeline.go
@@ -0,0 +1,351 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "runtime/debug"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/dump"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// rowReplayMaxInflight bounds how many row-replay batches may be sent but not
+// yet confirmed at once. A depth of 2 lets one batch's receiver-side write
+// overlap with the building and sending of the next batch (double buffering)
+// while capping the extra sender/receiver memory and write contention.
+const rowReplayMaxInflight = 2
+
+// nodeReplayError reports a failed row-replay batch. It distinguishes a global
+// send/confirm failure (err: the whole publish/close failed, or a row could 
not
+// be built or routed) from per-node delivery failures (cee: the receiver
+// rejected rows on specific nodes, keyed by node). At least one is set 
whenever
+// this error is returned; newNodeReplayError yields nil when neither is.
+type nodeReplayError struct {
+       err error
+       cee map[string]*common.Error
+}
+
+func (e *nodeReplayError) Error() string {
+       if e.err != nil {
+               return e.err.Error()
+       }
+       return fmt.Sprintf("%d node error(s)", len(e.cee))
+}
+
+func (e *nodeReplayError) Unwrap() error {
+       return e.err
+}
+
+// newNodeReplayError reduces a confirmed batch's result to an error: nil when
+// the batch fully succeeded (no global error and no per-node errors), 
otherwise
+// a *nodeReplayError carrying whichever failures occurred.
+func newNodeReplayError(cee map[string]*common.Error, err error) error {
+       if err == nil && len(cee) == 0 {
+               return nil
+       }
+       return &nodeReplayError{cee: cee, err: err}
+}
+
+// recordReplayNodeErrors records any per-node delivery errors carried by err
+// into progress so they surface in the migration report. It is a no-op for a
+// global-only error.
+func recordReplayNodeErrors(progress *Progress, group string, err error) {
+       var nre *nodeReplayError
+       if errors.As(err, &nre) && len(nre.cee) > 0 {
+               progress.RecordRowReplayNodeErrors(group, nre.cee)
+       }
+}
+
+// confirmPipeline bounds the number of in-flight (sent but unconfirmed)
+// row-replay batches. Each batch is confirmed on its own client-streaming
+// publisher in a background goroutine that delivers its error (nil on success)
+// on a channel; the pipeline keeps those channels in FIFO order so
+// confirmations are observed in send order. It is driven by a single goroutine
+// (the part's replay loop) and needs no internal locking.
+type confirmPipeline struct {
+       inflight []chan error
+       depth    int
+       // waited accumulates, over the pipeline's lifetime, the wall time the 
driving
+       // goroutine spent blocked on confirmations (the in-flight bound in add 
plus
+       // drain). Since that goroutine is either building or blocked here, it 
measures
+       // how long the sender was starved waiting on the receiver.
+       waited time.Duration
+}
+
+func newConfirmPipeline() *confirmPipeline {
+       return &confirmPipeline{depth: rowReplayMaxInflight}
+}
+
+// add registers a batch confirmation that will be delivered on ch. When the
+// pipeline is already full it first waits for the oldest in-flight 
confirmation
+// so no more than depth batches are ever outstanding, returning that awaited
+// error (a non-nil error means an earlier batch must abort the part).
+func (p *confirmPipeline) add(ch chan error) error {
+       var awaited error
+       if len(p.inflight) >= p.depth {
+               start := time.Now()
+               awaited = <-p.inflight[0]
+               p.waited += time.Since(start)
+               p.inflight = p.inflight[1:]
+       }
+       p.inflight = append(p.inflight, ch)
+       return awaited
+}
+
+// drain waits for every remaining in-flight confirmation and returns the first
+// failing error (or nil when all succeeded). It is safe to call more than 
once;
+// a drained pipeline is empty.
+func (p *confirmPipeline) drain() error {
+       var first error
+       for _, ch := range p.inflight {
+               start := time.Now()
+               out := <-ch
+               p.waited += time.Since(start)
+               if first == nil && out != nil {
+                       first = out
+               }
+       }
+       p.inflight = nil
+       return first
+}
+
+// batchSender buffers row-replay messages and, once a batch fills, sends it 
on a
+// client-streaming publisher and confirms it asynchronously through a bounded
+// confirmPipeline. Each batch is published on its own publisher (rotated in
+// immediately) and confirmed in the background, so one batch's receiver-side
+// write overlaps with building and sending the next. It owns the whole 
publisher
+// lifecycle (open, rotate, close) so the per-data-type replayers only build
+// messages and enqueue them. batchSize and timeout are supplied by each data
+// type. A sender is driven by a single goroutine (the part's replay loop): 
only
+// it touches the buffer, and each background confirmation touches just its own
+// captured publisher and channel, so no buffer locking is needed.
+type batchSender struct {
+       client    queue.Client
+       publisher queue.BatchPublisher
+       pipeline  *confirmPipeline
+       batch     []bus.Message
+       topic     bus.Topic
+       timeout   time.Duration
+       batchSize int
+}
+
+// newBatchSender builds a sender for one data type. batchSize and timeout are
+// per-type knobs each replayer (measure/stream/trace) supplies from its own
+// constants, reserved for per-type tuning; they coincide at 2000/30s today.
+func newBatchSender(client queue.Client, topic bus.Topic, batchSize int, 
timeout time.Duration) *batchSender {
+       return &batchSender{
+               client:    client,
+               publisher: client.NewBatchPublisher(timeout),
+               pipeline:  newConfirmPipeline(),
+               topic:     topic,
+               batch:     make([]bus.Message, 0, batchSize),
+               timeout:   timeout,
+               batchSize: batchSize,
+       }
+}
+
+// enqueue buffers msg and, when the batch is full, sends and asynchronously
+// confirms it. The returned error is non-nil only when the in-flight bound
+// forced it to wait on (and surface) an earlier batch's failure.
+func (s *batchSender) enqueue(ctx context.Context, msg bus.Message) error {
+       s.batch = append(s.batch, msg)
+       if len(s.batch) >= s.batchSize {
+               return s.flush(ctx)
+       }
+       return nil
+}
+
+// flush sends the buffered batch on the current publisher, rotates in a fresh
+// publisher for the next batch, and confirms the sent batch (closing its 
stream
+// to collect the per-node delivery result) in the background. The returned 
error
+// aborts the part promptly: it is this batch's synchronous send error, or — 
when
+// the in-flight bound made it wait — an earlier batch's surfaced result (which
+// takes priority, preserving send order). It is a no-op when the buffer is 
empty.
+func (s *batchSender) flush(ctx context.Context) error {
+       pending := s.take()
+       if len(pending) == 0 {
+               return nil
+       }
+       _, pubErr := s.publisher.Publish(ctx, s.topic, pending...)
+       pub := s.publisher
+       s.publisher = s.client.NewBatchPublisher(s.timeout)
+       ch := make(chan error, 1)
+       go func() {
+               // The pipeline blocks in add/drain until exactly one value 
arrives on ch, so
+               // the send must happen on every path: a deferred send (with 
recover)
+               // guarantees it even if pub.Close panics, turning a would-be 
deadlock into a
+               // surfaced error.
+               var out error
+               defer func() {
+                       if rec := recover(); rec != nil {
+                               out = fmt.Errorf("row-replay batch confirm 
panicked: %v\n%s", rec, debug.Stack())
+                       }
+                       ch <- out
+               }()
+               cee, closeErr := pub.Close()
+               err := pubErr
+               if err == nil {
+                       err = closeErr
+               } else {
+                       cee = nil // publish failed — receiver saw nothing, so 
per-node cee is noise
+               }
+               out = newNodeReplayError(cee, err)
+       }()
+       // Abort promptly: an earlier in-flight batch's failure (via the bound) 
wins,
+       // otherwise this batch's own synchronous send error.
+       if awaited := s.pipeline.add(ch); awaited != nil {
+               return awaited
+       }
+       if pubErr != nil {
+               return newNodeReplayError(nil, pubErr)
+       }
+       return nil
+}
+
+// drain waits for every in-flight batch confirmation and returns the first
+// failure (or nil when all succeeded).
+func (s *batchSender) drain() error {
+       return s.pipeline.drain()
+}
+
+// rowCursor is the subset of a dump iterator the replay driver needs; the emit
+// closure reads the current row itself. *dump.Iterator[T] satisfies it for 
any T.
+type rowCursor interface {
+       Next() bool
+       Err() error
+       Position() dump.Position
+}
+
+// replay drives one part's row-replay: it pulls rows from cur, sends each 
through
+// emit (which builds the message and enqueues it on this sender), flushes the
+// trailing batch, then drains the pipeline so the part is reported durable 
only
+// once every batch is confirmed. counter (when non-nil) counts completed 
parts,
+// and a per-part build_send vs confirm_wait timing line is logged on success. 
The
+// returned error is non-nil when any row build/route, iteration, or batch
+// confirmation failed; the caller maps that to its progress bookkeeping.
+func (s *batchSender) replay(ctx context.Context, l *logger.Logger, group, 
part string, counter *uint64,
+       cur rowCursor, emit func() error,
+) (int, error) {
+       start := time.Now()
+       waited0 := s.pipeline.waited
+       warnAbort := func(rows int, err error) {
+               pos := cur.Position()
+               l.Warn().Err(err).
+                       Str("group", group).Str("part", part).
+                       Int("block_idx", pos.BlockIdx).Int("row_idx", 
pos.RowIdx).
+                       Int("rows_published", rows).
+                       Msg("row-replay aborted mid-part; will retry on resume")
+       }
+       rowCount := 0
+       var failure error
+       for cur.Next() {
+               if err := emit(); err != nil {
+                       warnAbort(rowCount, err)
+                       failure = err
+                       break
+               }
+               rowCount++
+       }
+       if failure == nil {
+               if iterErr := cur.Err(); iterErr != nil {
+                       warnAbort(rowCount, iterErr)
+                       failure = iterErr
+               } else {
+                       failure = s.flush(ctx)
+               }
+       }
+       // Wait for every in-flight batch confirmation before returning so the 
part is
+       // only reported durable once all of its rows are acknowledged.
+       if drained := s.drain(); failure == nil {
+               failure = drained
+       }
+       if failure != nil {
+               // Drop the failed part's un-flushed residual rows (an abort 
skips the
+               // trailing flush) so a later close() cannot resurrect rows the 
caller was
+               // told were not delivered; the whole part is re-replayed on 
resume.
+               s.discard()
+               return rowCount, failure
+       }
+       if counter != nil {
+               atomic.AddUint64(counter, 1)
+       }
+       logRowReplayTiming(l, group, part, rowCount, time.Since(start), 
s.pipeline.waited-waited0)
+       return rowCount, nil
+}
+
+// logRowReplayTiming emits one info line per replayed part splitting its wall
+// time into build_send (sender CPU: decode, route, marshal, publish) and
+// confirm_wait (time the sender was blocked waiting on the receiver). A high
+// confirm_wait points at the receiver; a high build_send points at the sender.
+func logRowReplayTiming(l *logger.Logger, group, part string, rows int, total, 
wait time.Duration) {
+       if rows == 0 {
+               return
+       }
+       buildSend := total - wait
+       if buildSend < 0 {
+               buildSend = 0
+       }
+       l.Info().
+               Str("group", group).
+               Str("part", part).
+               Int("rows", rows).
+               Dur("total", total).
+               Dur("build_send", buildSend).
+               Dur("confirm_wait", wait).
+               Msg("row-replay part timing")
+}
+
+// close waits for any outstanding confirmations and closes the idle publisher.
+// By construction the buffer is empty here: every replay flushes its trailing
+// batch on success or discards it on failure, so close never needs to send.
+func (s *batchSender) close() (map[string]*common.Error, error) {
+       drained := s.drain()
+       cee, closeErr := s.publisher.Close()
+       if drained != nil {
+               var nre *nodeReplayError
+               if errors.As(drained, &nre) {
+                       return nre.cee, nre.err
+               }
+               return nil, drained
+       }
+       return cee, closeErr
+}
+
+// take swaps out the buffered batch, returning nil when empty.
+func (s *batchSender) take() []bus.Message {
+       if len(s.batch) == 0 {
+               return nil
+       }
+       pending := s.batch
+       s.batch = make([]bus.Message, 0, s.batchSize)
+       return pending
+}
+
+// discard drops any buffered rows not yet flushed, abandoning the residual of 
a
+// failed part so it is never sent. It is take() with the rows thrown away.
+func (s *batchSender) discard() {
+       s.take()
+}
diff --git a/banyand/backup/lifecycle/row_replay_stream.go 
b/banyand/backup/lifecycle/row_replay_stream.go
index 9bcb3347c..92db82ec1 100644
--- a/banyand/backup/lifecycle/row_replay_stream.go
+++ b/banyand/backup/lifecycle/row_replay_stream.go
@@ -24,7 +24,6 @@ import (
        "path/filepath"
        "strconv"
        "sync"
-       "sync/atomic"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -59,8 +58,7 @@ type cachedStreamSchema struct {
 // streamRowReplayer replays a group's stream parts row-by-row.
 type streamRowReplayer struct {
        selector       node.Selector
-       client         queue.Client
-       publisher      queue.BatchPublisher
+       sender         *batchSender
        metadata       metadata.Repo
        fs             fs.FileSystem
        logger         *logger.Logger
@@ -69,10 +67,8 @@ type streamRowReplayer struct {
        counter        *uint64
        group          string
        irPath         string
-       batch          []bus.Message
        schemaCacheMu  sync.Mutex
        irMu           sync.Mutex
-       batchMu        sync.Mutex
        targetShardNum uint32
 }
 
@@ -86,23 +82,19 @@ func newStreamRowReplayer(
                group:          group,
                targetShardNum: targetShardNum,
                selector:       selector,
-               client:         client,
-               publisher:      
client.NewBatchPublisher(streamReplayBatchTimeout),
+               sender:         newBatchSender(client, data.TopicStreamWrite, 
streamReplayBatchSize, streamReplayBatchTimeout),
                metadata:       md,
                fs:             fileSystem,
                logger:         l,
                schemaCache:    make(map[string]*cachedStreamSchema),
                counter:        counter,
-               batch:          make([]bus.Message, 0, streamReplayBatchSize),
        }
 }
 
-// Close flushes pending messages, closes the publisher and releases cached
-// IndexResolver handles.
+// Close drains any outstanding batch confirmations, closes the publisher and
+// releases cached IndexResolver handles.
 func (r *streamRowReplayer) Close() (map[string]*common.Error, error) {
-       flushCtx, cancel := context.WithTimeout(context.Background(), 
streamReplayBatchTimeout)
-       defer cancel()
-       flushErr := r.flushBatch(flushCtx)
+       cee, closeErr := r.sender.close()
        r.irMu.Lock()
        if r.irResolver != nil {
                _ = r.irResolver.Close()
@@ -110,25 +102,6 @@ func (r *streamRowReplayer) Close() 
(map[string]*common.Error, error) {
                r.irPath = ""
        }
        r.irMu.Unlock()
-       cee, closeErr := r.publisher.Close()
-       if flushErr != nil {
-               return cee, flushErr
-       }
-       return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain 
the
-// per-node delivery result for everything sent since the last call, then 
opens a
-// fresh publisher for subsequent parts. The batch publisher is 
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed part be confirmed durable before it is marked completed.
-func (r *streamRowReplayer) flushAndConfirm(ctx context.Context) 
(map[string]*common.Error, error) {
-       flushErr := r.flushBatch(ctx)
-       cee, closeErr := r.publisher.Close()
-       r.publisher = r.client.NewBatchPublisher(streamReplayBatchTimeout)
-       if flushErr != nil {
-               return cee, flushErr
-       }
        return cee, closeErr
 }
 
@@ -198,38 +171,8 @@ func (r *streamRowReplayer) replayPart(ctx 
context.Context, partPath string) (in
 
        it := reader.Iterator()
        defer it.Close()
-
-       rowCount := 0
-       for it.Next() {
-               row := it.Row()
-               if rowErr := r.publishRow(ctx, row); rowErr != nil {
-                       pos := it.Position()
-                       r.logger.Warn().Err(rowErr).
-                               Str("group", r.group).
-                               Str("part", partPath).
-                               Int("block_idx", pos.BlockIdx).
-                               Int("row_idx", pos.RowIdx).
-                               Int("rows_published", rowCount).
-                               Msg("stream row-replay aborted mid-part on 
publish error; will retry on resume")
-                       return rowCount, rowErr
-               }
-               rowCount++
-       }
-       if iterErr := it.Err(); iterErr != nil {
-               pos := it.Position()
-               r.logger.Warn().Err(iterErr).
-                       Str("group", r.group).
-                       Str("part", partPath).
-                       Int("block_idx", pos.BlockIdx).
-                       Int("row_idx", pos.RowIdx).
-                       Int("rows_published", rowCount).
-                       Msg("stream row-replay aborted mid-part; will retry on 
resume")
-               return rowCount, iterErr
-       }
-       if r.counter != nil {
-               atomic.AddUint64(r.counter, 1)
-       }
-       return rowCount, nil
+       return r.sender.replay(ctx, r.logger, r.group, partPath, r.counter, it,
+               func() error { return r.publishRow(ctx, it.Row()) })
 }
 
 // buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -277,30 +220,5 @@ func (r *streamRowReplayer) publishRow(ctx 
context.Context, row dumpstream.Row)
                return fmt.Errorf("pick target node for %s: %w", 
wr.Metadata.Name, err)
        }
        msg := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
-       return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *streamRowReplayer) enqueue(ctx context.Context, msg bus.Message) 
error {
-       r.batchMu.Lock()
-       r.batch = append(r.batch, msg)
-       shouldFlush := len(r.batch) >= streamReplayBatchSize
-       r.batchMu.Unlock()
-       if shouldFlush {
-               return r.flushBatch(ctx)
-       }
-       return nil
-}
-
-func (r *streamRowReplayer) flushBatch(ctx context.Context) error {
-       r.batchMu.Lock()
-       if len(r.batch) == 0 {
-               r.batchMu.Unlock()
-               return nil
-       }
-       pending := r.batch
-       r.batch = make([]bus.Message, 0, streamReplayBatchSize)
-       r.batchMu.Unlock()
-       _, err := r.publisher.Publish(ctx, data.TopicStreamWrite, pending...)
-       return err
+       return r.sender.enqueue(ctx, msg)
 }
diff --git a/banyand/backup/lifecycle/row_replay_test.go 
b/banyand/backup/lifecycle/row_replay_test.go
index 5aa2931b1..56ee9cb85 100644
--- a/banyand/backup/lifecycle/row_replay_test.go
+++ b/banyand/backup/lifecycle/row_replay_test.go
@@ -24,15 +24,18 @@ import (
        gofs "io/fs"
        "path/filepath"
        "strconv"
+       "sync/atomic"
        "testing"
        "time"
 
        "github.com/onsi/gomega"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+       "go.uber.org/mock/gomock"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/timestamppb"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -966,3 +969,550 @@ func findRoundtripPartDirs(root string) []string {
        })
        return dirs
 }
+
+// countingBatchPublisher is a queue.BatchPublisher fake that records how many
+// times it was published to and closed, and how many messages each Publish
+// carried. Close runs in the replayer's background confirmation goroutines, so
+// the counters are atomic. closeCee injects a per-node delivery failure,
+// publishErr injects a synchronous send failure, and panicOnClose makes Close
+// panic to exercise the confirm goroutine's panic-safety.
+type countingBatchPublisher struct {
+       closeCee      map[string]*common.Error
+       publishErr    error
+       panicOnClose  bool
+       publishCount  atomic.Int32
+       publishedRows atomic.Int32
+       closeCount    atomic.Int32
+}
+
+func (f *countingBatchPublisher) Publish(_ context.Context, _ bus.Topic, 
messages ...bus.Message) (bus.Future, error) {
+       f.publishCount.Add(1)
+       f.publishedRows.Add(int32(len(messages)))
+       return nil, f.publishErr
+}
+
+func (f *countingBatchPublisher) Close() (map[string]*common.Error, error) {
+       f.closeCount.Add(1)
+       if f.panicOnClose {
+               panic("simulated publisher close panic")
+       }
+       return f.closeCee, nil
+}
+
+// senderReturning wires a batchSender whose successive publishers are the 
given
+// ones in order, then fresh clean publishers once the list is exhausted. It
+// centralizes the mock-controller + NewBatchPublisher boilerplate and the
+// "first publisher carries the first batch" rotation order for the sender 
tests.
+// The returned accessor reports every publisher actually vended (provided and
+// fresh) — including appends that happen after this returns — so a test can
+// assert the exact number vended and inspect each, catching extra
+// rotations/publishes that a pre-sized expectation would miss.
+func senderReturning(t *testing.T, pubs ...*countingBatchPublisher) 
(*batchSender, func() []*countingBatchPublisher) {
+       t.Helper()
+       ctrl := gomock.NewController(t)
+       t.Cleanup(ctrl.Finish)
+       mockClient := queue.NewMockClient(ctrl)
+       var vended []*countingBatchPublisher
+       i := 0
+       mockClient.EXPECT().NewBatchPublisher(measureReplayBatchTimeout).
+               DoAndReturn(func(time.Duration) queue.BatchPublisher {
+                       var p *countingBatchPublisher
+                       if i < len(pubs) {
+                               p, i = pubs[i], i+1
+                       } else {
+                               p = &countingBatchPublisher{}
+                       }
+                       vended = append(vended, p)
+                       return p
+               }).AnyTimes()
+       return newBatchSender(mockClient, data.TopicMeasureWrite, 
measureReplayBatchSize, measureReplayBatchTimeout),
+               func() []*countingBatchPublisher { return vended }
+}
+
+// totalPublished sums the publish counts across the given publishers.
+func totalPublished(pubs []*countingBatchPublisher) int32 {
+       var total int32
+       for _, p := range pubs {
+               total += p.publishCount.Load()
+       }
+       return total
+}
+
+// totalPublishedRows sums the message counts actually handed to Publish across
+// the given publishers, i.e. how many source rows were really sent.
+func totalPublishedRows(pubs []*countingBatchPublisher) int32 {
+       var total int32
+       for _, p := range pubs {
+               total += p.publishedRows.Load()
+       }
+       return total
+}
+
+// cleanEmit returns a replay emit callback that enqueues one dummy row per 
call.
+func cleanEmit(s *batchSender) func() error {
+       return func() error {
+               return s.enqueue(context.TODO(),
+                       bus.NewBatchMessageWithNode(bus.MessageID(1), "node-1", 
&measurev1.InternalWriteRequest{}))
+       }
+}
+
+// failingEmit is cleanEmit that instead returns a build error on the failAt-th
+// call, aborting the part with whatever was buffered/sent so far.
+func failingEmit(s *batchSender, failAt int) func() error {
+       clean := cleanEmit(s)
+       calls := 0
+       return func() error {
+               calls++
+               if calls == failAt {
+                       return fmt.Errorf("build failed")
+               }
+               return clean()
+       }
+}
+
+// nodeErrCee extracts the per-node delivery errors carried by a failed replay
+// error, failing the test if err is nil or not a *nodeReplayError.
+func nodeErrCee(t *testing.T, err error) map[string]*common.Error {
+       t.Helper()
+       var nre *nodeReplayError
+       require.ErrorAs(t, err, &nre)
+       return nre.cee
+}
+
+// TestConfirmPipeline_BoundsInflightDepth proves the pipeline never keeps more
+// than depth confirmations outstanding: a (depth+1)th add blocks until the
+// oldest in-flight confirmation resolves, and drain surfaces the first 
failure.
+func TestConfirmPipeline_BoundsInflightDepth(t *testing.T) {
+       // rowReplayMaxInflight is 2, so a 3rd unconfirmed batch must wait.
+       p := newConfirmPipeline()
+       ch1 := make(chan error, 1)
+       ch2 := make(chan error, 1)
+       ch3 := make(chan error, 1)
+
+       // Within depth: add must not block and has nothing to surface yet.
+       require.NoError(t, p.add(ch1))
+       require.NoError(t, p.add(ch2))
+
+       // Full (2 in flight): the 3rd add must block until the oldest (ch1) 
resolves.
+       added := make(chan error, 1)
+       go func() { added <- p.add(ch3) }()
+       select {
+       case <-added:
+               t.Fatal("add exceeded the in-flight depth without waiting for 
the oldest confirmation")
+       case <-time.After(100 * time.Millisecond):
+       }
+       ch1 <- nil // oldest confirms cleanly
+       select {
+       case out := <-added:
+               require.NoError(t, out, "a clean oldest confirmation must not 
report failure")
+       case <-time.After(2 * time.Second):
+               t.Fatal("add did not unblock after the oldest confirmation 
resolved")
+       }
+
+       // drain waits for the rest and returns the first failing error.
+       ch2 <- nil
+       ch3 <- newNodeReplayError(map[string]*common.Error{"node-1": 
common.NewError("boom")}, nil)
+       require.Contains(t, nodeErrCee(t, p.drain()), "node-1")
+}
+
+// enqueueN feeds n messages through the sender, failing the test on any error
+// surfaced early by the in-flight bound.
+func enqueueN(t *testing.T, s *batchSender, n int) {
+       t.Helper()
+       emit := cleanEmit(s)
+       for i := 0; i < n; i++ {
+               require.NoError(t, emit())
+       }
+}
+
+// TestBatchSender_SendsRotatesAndConfirmsEachBatch verifies the sender flow:
+// every full batch is published once on its own publisher, the publisher is
+// rotated for the next batch, and each sent batch is confirmed (closed) 
exactly
+// once, with all confirmations drained before the part is considered durable.
+func TestBatchSender_SendsRotatesAndConfirmsEachBatch(t *testing.T) {
+       const batches = 3
+       s, vended := senderReturning(t)
+       enqueueN(t, s, batches*measureReplayBatchSize)
+       require.NoError(t, s.drain())
+
+       require.Len(t, vended(), batches+1, "exactly the initial publisher plus 
one rotation per sent batch")
+       for i := 0; i < batches; i++ {
+               require.Equalf(t, int32(1), vended()[i].publishCount.Load(), 
"publisher %d must send its batch once", i)
+               require.Equalf(t, int32(1), vended()[i].closeCount.Load(), 
"publisher %d must be confirmed (closed) once", i)
+       }
+       require.Equal(t, int32(0), vended()[batches].publishCount.Load(), "the 
staged publisher holds no batch yet")
+}
+
+// TestBatchSender_SurfacesBatchNodeError verifies a per-node delivery failure 
on
+// a confirmed batch propagates out of the sender (via the final drain) so the
+// caller can mark the part errored and retry it on resume.
+func TestBatchSender_SurfacesBatchNodeError(t *testing.T) {
+       failing := &countingBatchPublisher{closeCee: 
map[string]*common.Error{"node-1": common.NewError("flush timeout")}}
+       s, _ := senderReturning(t, failing) // the first publisher carries the 
batch and fails
+       enqueueN(t, s, measureReplayBatchSize)
+       require.Contains(t, nodeErrCee(t, s.drain()), "node-1",
+               "a batch's per-node delivery error must surface when drained")
+}
+
+// fakeCursor yields n rows for the replay driver, with a zero Position. err, 
when
+// set, is returned by Err() after the rows are exhausted to exercise the
+// iterator-error path.
+type fakeCursor struct {
+       err error
+       n   int
+       i   int
+}
+
+func (c *fakeCursor) Next() bool              { c.i++; return c.i <= c.n }
+func (c *fakeCursor) Err() error              { return c.err }
+func (c *fakeCursor) Position() dump.Position { return dump.Position{} }
+
+// TestBatchSender_SurfacesClosePanic pins the panic-safety of the confirm
+// goroutine: if a publisher's Close panics, the goroutine must still deliver a
+// (failed) error so the pipeline never deadlocks. Without the deferred
+// recover+send this would hang (or crash), so the bounded wait turns a
+// regression into a failed assertion rather than a hung test.
+func TestBatchSender_SurfacesClosePanic(t *testing.T) {
+       s, _ := senderReturning(t, &countingBatchPublisher{panicOnClose: true})
+       enqueueN(t, s, measureReplayBatchSize) // one full batch on the 
panicking publisher
+
+       done := make(chan error, 1)
+       go func() { done <- s.drain() }()
+       select {
+       case out := <-done:
+               require.ErrorContains(t, out, "panic", "a panicking Close must 
surface as an error")
+       case <-time.After(2 * time.Second):
+               t.Fatal("drain hung: the confirm goroutine did not deliver an 
error after Close panicked")
+       }
+}
+
+// TestConfirmPipeline_DrainKeepsFirstFailure pins that drain returns the 
EARLIEST
+// failing error, not a later one, when several in-flight batches fail.
+func TestConfirmPipeline_DrainKeepsFirstFailure(t *testing.T) {
+       p := newConfirmPipeline()
+       ch1 := make(chan error, 1)
+       ch2 := make(chan error, 1)
+       require.NoError(t, p.add(ch1))
+       require.NoError(t, p.add(ch2))
+
+       ch1 <- fmt.Errorf("first failure")
+       ch2 <- fmt.Errorf("second failure")
+       require.ErrorContains(t, p.drain(), "first failure", "drain must keep 
the earliest failure")
+}
+
+// TestBatchSender_SurfacesFailureViaInflightBound pins the realistic 
mid-stream
+// abort path: a batch's failure is surfaced through the in-flight bound's
+// add-await (i.e. out of enqueue), not only at the final drain.
+func TestBatchSender_SurfacesFailureViaInflightBound(t *testing.T) {
+       failing := &countingBatchPublisher{closeCee: 
map[string]*common.Error{"node-1": common.NewError("boom")}}
+       s, _ := senderReturning(t, failing) // batch 1 fails; later batches are 
clean
+
+       emit := cleanEmit(s)
+       var surfaced error
+       for i := 0; i < 3*measureReplayBatchSize; i++ {
+               if out := emit(); out != nil {
+                       surfaced = out
+               }
+       }
+       require.Contains(t, nodeErrCee(t, surfaced), "node-1",
+               "batch 1's failure must surface via the in-flight bound during 
enqueue, not only at drain")
+       s.drain()
+}
+
+// TestReplay_RowCountBoundaries pins the driver's batching across small and 
large
+// parts: for every part size the reported row count is exact, a successful 
part
+// (even an empty one) counts once, the number of published batches matches the
+// 2000-row boundary, and a subsequent close never publishes.
+func TestReplay_RowCountBoundaries(t *testing.T) {
+       cases := []struct {
+               name        string
+               rows        int
+               wantBatches int
+       }{
+               {"empty", 0, 0},
+               {"single_row", 1, 1},
+               {"just_under_one_batch", measureReplayBatchSize - 1, 1},
+               {"exactly_one_batch", measureReplayBatchSize, 1},
+               {"one_over_one_batch", measureReplayBatchSize + 1, 2},
+               {"exactly_two_batches", 2 * measureReplayBatchSize, 2},
+               {"two_batches_and_a_tail", 2*measureReplayBatchSize + 5, 3},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       s, vended := senderReturning(t)
+                       var counter uint64
+                       rows, err := s.replay(context.TODO(), 
logger.GetLogger("test-replayer"), "g", "p", &counter, &fakeCursor{n: tc.rows}, 
cleanEmit(s))
+                       require.NoError(t, err)
+                       require.Equal(t, tc.rows, rows, "rowCount must equal 
the source row count")
+                       require.Equal(t, uint64(1), counter, "every successful 
part counts once, even when empty")
+                       // Sum over EVERY vended publisher so an extra batch on 
a rotated-beyond
+                       // publisher is still counted, and pin the exact 
rotation count.
+                       require.Equal(t, int32(tc.wantBatches), 
totalPublished(vended()), "published batch count")
+                       require.Len(t, vended(), tc.wantBatches+1, "exactly one 
publisher per batch plus the staged one")
+                       // Every source row must reach Publish exactly once: no 
row dropped at a
+                       // batch boundary and none lost in the trailing tail.
+                       require.Equal(t, int32(tc.rows), 
totalPublishedRows(vended()), "every source row must be published exactly once")
+
+                       _, cerr := s.close()
+                       require.NoError(t, cerr)
+                       require.Equal(t, int32(tc.wantBatches), 
totalPublished(vended()), "close must not publish after a successful part")
+               })
+       }
+}
+
+// TestReplay_BatchPlusTailSendsEveryRow pins the "a few rows past a flush
+// boundary" case: with two full batches plus a short tail, each full batch
+// carries exactly batchSize messages, the tail batch carries the remainder, 
the
+// staged publisher carries nothing, and the grand total equals the row count —
+// so no row is dropped at a boundary nor lost in the trailing tail.
+func TestReplay_BatchPlusTailSendsEveryRow(t *testing.T) {
+       const tail = 5
+       const rows = 2*measureReplayBatchSize + tail
+       s, vended := senderReturning(t)
+       var counter uint64
+       got, err := s.replay(context.TODO(), logger.GetLogger("test-replayer"), 
"g", "p", &counter, &fakeCursor{n: rows}, cleanEmit(s))
+       require.NoError(t, err)
+       require.Equal(t, rows, got, "rowCount must equal the source row count")
+
+       pubs := vended()
+       require.Len(t, pubs, 4, "two full batches + the tail batch + the staged 
publisher")
+       require.Equal(t, int32(measureReplayBatchSize), 
pubs[0].publishedRows.Load(), "first full batch carries batchSize rows")
+       require.Equal(t, int32(measureReplayBatchSize), 
pubs[1].publishedRows.Load(), "second full batch carries batchSize rows")
+       require.Equal(t, int32(tail), pubs[2].publishedRows.Load(), "the 
trailing batch carries exactly the tail")
+       require.Equal(t, int32(0), pubs[3].publishedRows.Load(), "the staged 
publisher holds nothing")
+       require.Equal(t, int32(rows), totalPublishedRows(pubs), "every row sent 
exactly once")
+
+       _, cerr := s.close()
+       require.NoError(t, cerr)
+}
+
+// TestReplay_AbortDrainsInflightAndDiscards pins the abort path across small 
and
+// large parts and at the batch boundary: whatever full batches were already 
sent
+// are still confirmed (drained), the un-flushed residual is discarded (never
+// resurrected by a later close), the failed part is not counted, and no row 
of an
+// aborted part is published beyond the batches that left before the failure.
+func TestReplay_AbortDrainsInflightAndDiscards(t *testing.T) {
+       cases := []struct {
+               name            string
+               failAt          int // the 1-based emit call that returns a 
build error
+               wantRows        int // rows counted before the failure
+               wantSentBatches int // full batches flushed before the failure
+       }{
+               {"first_row_nothing_buffered", 1, 0, 0},
+               {"small_residual_no_batch", 2, 1, 0},
+               {"empty_residual_at_batch_boundary", measureReplayBatchSize + 
1, measureReplayBatchSize, 1},
+               {"residual_after_two_batches", 2*measureReplayBatchSize + 501, 
2*measureReplayBatchSize + 500, 2},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       s, vended := senderReturning(t)
+                       var counter uint64
+                       rows, err := s.replay(context.TODO(), 
logger.GetLogger("test-replayer"), "g", "p", &counter, &fakeCursor{n: 
tc.failAt}, failingEmit(s, tc.failAt))
+                       require.Error(t, err, "the part must report failure")
+                       require.Equal(t, tc.wantRows, rows, "rowCount counts 
the rows enqueued before the failure")
+                       require.Zero(t, counter, "a failed part must not be 
counted")
+
+                       require.Len(t, vended(), tc.wantSentBatches+1, "one 
publisher per sent batch plus the staged one")
+                       for i := 0; i < tc.wantSentBatches; i++ {
+                               require.Equalf(t, int32(1), 
vended()[i].closeCount.Load(), "sent batch %d must be confirmed (drained)", i)
+                       }
+                       require.Equal(t, int32(tc.wantSentBatches), 
totalPublished(vended()), "only the batches sent before the failure are 
published")
+                       require.Nil(t, s.take(), "the residual must be 
discarded")
+                       _, cerr := s.close()
+                       require.NoError(t, cerr)
+               })
+       }
+}
+
+// TestReplay_IteratorErrorDrainsAndDiscards covers the distinct abort branch 
where
+// the source iterator fails after yielding rows (cur.Err() != nil). The 
already-
+// sent batches must still be drained, the residual discarded, and the part 
failed
+// — so a truncated part is never committed as durable.
+func TestReplay_IteratorErrorDrainsAndDiscards(t *testing.T) {
+       s, vended := senderReturning(t)
+       const n = 2*measureReplayBatchSize + 300 // two full batches sent, 300 
buffered, then the iterator errors
+       var counter uint64
+       rows, err := s.replay(context.TODO(), 
logger.GetLogger("test-replayer"), "g", "p", &counter,
+               &fakeCursor{n: n, err: fmt.Errorf("iterator boom")}, 
cleanEmit(s))
+       require.ErrorContains(t, err, "iterator boom", "an iterator error must 
fail the part")
+       require.Equal(t, n, rows, "all rows read before the iterator error are 
counted")
+       require.Zero(t, counter, "a failed part must not be counted")
+
+       require.Len(t, vended(), 3, "two sent batches plus the staged one")
+       require.Equal(t, int32(1), vended()[0].closeCount.Load(), "sent batch 0 
must be drained")
+       require.Equal(t, int32(1), vended()[1].closeCount.Load(), "sent batch 1 
must be drained")
+       require.Equal(t, int32(0), vended()[2].publishCount.Load(), "the 
residual must not be published")
+       require.Nil(t, s.take(), "the residual must be discarded")
+}
+
+// TestReplay_NodeErrorViaInflightBoundDrivesAbort covers the composition of a
+// per-node confirm failure surfacing through the in-flight bound's add-await
+// while driven by replay (not raw enqueue): the part aborts mid-scan, 
in-flight
+// batches drain, and the residual is discarded.
+func TestReplay_NodeErrorViaInflightBoundDrivesAbort(t *testing.T) {
+       failing := &countingBatchPublisher{closeCee: 
map[string]*common.Error{"node-1": common.NewError("boom")}}
+       s, _ := senderReturning(t, failing) // batch 1's confirm fails
+       var counter uint64
+       rows, err := s.replay(context.TODO(), 
logger.GetLogger("test-replayer"), "g", "p", &counter,
+               &fakeCursor{n: 3 * measureReplayBatchSize}, cleanEmit(s))
+       require.Contains(t, nodeErrCee(t, err), "node-1", "the node error must 
abort the part")
+       require.Zero(t, counter, "a failed part must not be counted")
+       // Batch 1's failure surfaces only once a later batch's flush awaits it 
at the
+       // in-flight bound, so the part aborts mid-scan: after more than one 
batch's
+       // worth of rows but before consuming all of them.
+       require.Greater(t, rows, measureReplayBatchSize, "aborted after more 
than one batch")
+       require.Less(t, rows, 3*measureReplayBatchSize, "aborted mid-scan, not 
after consuming every row")
+       require.Nil(t, s.take(), "the residual must be discarded")
+       _, cerr := s.close()
+       require.NoError(t, cerr)
+}
+
+// TestRecordReplayNodeErrors_ReportsPartialFailure pins the partial-failure
+// report: the per-node delivery errors a failed batch carries are recorded 
into
+// the progress report keyed by node (even through fmt wrapping), while a
+// global-only failure and a plain non-nodeReplayError add no node entries — so
+// the resume report reflects exactly which nodes rejected rows and nothing 
else.
+func TestRecordReplayNodeErrors_ReportsPartialFailure(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       l := logger.GetLogger("test")
+
+       t.Run("per_node_errors_recorded", func(t *testing.T) {
+               p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+               err := newNodeReplayError(map[string]*common.Error{
+                       "node-2": common.NewError("flush timeout"),
+                       "node-5": common.NewError("shard rejected"),
+               }, nil)
+               recordReplayNodeErrors(p, "g", err)
+               require.Contains(t, p.RowReplayNodeErrors, "g")
+               require.Len(t, p.RowReplayNodeErrors["g"], 2, "exactly the 
failing nodes are reported")
+               require.Contains(t, p.RowReplayNodeErrors["g"]["node-2"], 
"flush timeout")
+               require.Contains(t, p.RowReplayNodeErrors["g"]["node-5"], 
"shard rejected")
+               require.NotContains(t, p.RowReplayNodeErrors["g"], "node-1", "a 
node that did not fail must not be reported")
+       })
+
+       t.Run("wrapped_error_still_unwrapped", func(t *testing.T) {
+               p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+               inner := newNodeReplayError(map[string]*common.Error{"node-7": 
common.NewError("boom")}, nil)
+               recordReplayNodeErrors(p, "g", fmt.Errorf("row-replay measure 
part p: %w", inner))
+               require.Contains(t, p.RowReplayNodeErrors["g"], "node-7", 
"errors.As must find node errors through fmt wrapping")
+       })
+
+       t.Run("global_only_error_records_nothing", func(t *testing.T) {
+               p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+               recordReplayNodeErrors(p, "g", newNodeReplayError(nil, 
fmt.Errorf("stream broke")))
+               require.NotContains(t, p.RowReplayNodeErrors, "g", "a 
global-only failure must not add node entries")
+       })
+
+       t.Run("plain_error_records_nothing", func(t *testing.T) {
+               p := NewProgress(filepath.Join(t.TempDir(), "progress.json"), l)
+               recordReplayNodeErrors(p, "g", fmt.Errorf("open part failed"))
+               require.NotContains(t, p.RowReplayNodeErrors, "g", "a 
non-nodeReplayError must not be reported as node failures")
+       })
+}
+
+// TestBatchSender_EachBatchGetsItsOwnTimeoutWindow pins the fix for the 
original
+// migration timeout bug: every batch is published on its OWN publisher opened
+// with a fresh copy of the configured timeout, so the window is per-batch. A 
slow
+// gap between two sends can never eat into the next batch's window the way a
+// single long-lived stream's lifetime once did. A 5s-timeout sender sends two
+// full batches; we assert a fresh NewBatchPublisher(5s) backs each batch 
(initial
+// publisher + one rotation per batch), every call uses the full 5s (not a 
shrunk
+// or shared window), and the two batches ride distinct publishers.
+func TestBatchSender_EachBatchGetsItsOwnTimeoutWindow(t *testing.T) {
+       const (
+               timeout   = 5 * time.Second
+               batchSize = 2
+       )
+       ctrl := gomock.NewController(t)
+       t.Cleanup(ctrl.Finish)
+       mockClient := queue.NewMockClient(ctrl)
+       var timeouts []time.Duration
+       var pubs []*countingBatchPublisher
+       mockClient.EXPECT().NewBatchPublisher(gomock.Any()).
+               DoAndReturn(func(d time.Duration) queue.BatchPublisher {
+                       timeouts = append(timeouts, d)
+                       p := &countingBatchPublisher{}
+                       pubs = append(pubs, p)
+                       return p
+               }).AnyTimes()
+
+       s := newBatchSender(mockClient, data.TopicMeasureWrite, batchSize, 
timeout)
+       for i := 0; i < 2*batchSize; i++ { // two full batches
+               require.NoError(t, s.enqueue(context.TODO(),
+                       bus.NewBatchMessageWithNode(bus.MessageID(1), "node-1", 
&measurev1.InternalWriteRequest{})))
+       }
+       require.NoError(t, s.drain())
+
+       // Initial publisher + one rotation per sent batch = 3 
NewBatchPublisher calls,
+       // and EVERY one must request the full configured timeout (a fresh 5s 
window),
+       // proving the two sends do not share one stream's single 5s lifetime.
+       require.Len(t, timeouts, 3, "each batch must open its own publisher, 
not share one")
+       for i, d := range timeouts {
+               require.Equalf(t, timeout, d, "publisher %d must open with the 
full configured timeout, not a shared/shrunk one", i)
+       }
+       require.Len(t, pubs, 3)
+       require.Equal(t, int32(1), pubs[0].publishCount.Load(), "batch 1 rides 
its own publisher")
+       require.Equal(t, int32(1), pubs[1].publishCount.Load(), "batch 2 rides 
a different publisher")
+       require.Equal(t, int32(0), pubs[2].publishCount.Load(), "the staged 
publisher holds nothing")
+       require.NotSame(t, pubs[0], pubs[1], "the two batches must not share a 
publisher (nor its timeout window)")
+
+       _, cerr := s.close()
+       require.NoError(t, cerr)
+}
+
+// TestNodeReplayError_ErrorAndUnwrap pins the error type directly: its two
+// message forms (global cause vs per-node count) and that Unwrap exposes the
+// global cause so errors.Is/As chains reach it even through fmt wrapping.
+func TestNodeReplayError_ErrorAndUnwrap(t *testing.T) {
+       // Global-error form: Error() is the wrapped cause, and Unwrap exposes 
it.
+       cause := fmt.Errorf("publish failed")
+       globalErr := newNodeReplayError(nil, cause)
+       require.EqualError(t, globalErr, "publish failed")
+       require.ErrorIs(t, globalErr, cause, "Unwrap must expose the global 
cause to errors.Is")
+       require.ErrorIs(t, fmt.Errorf("row-replay measure part p: %w", 
globalErr), cause,
+               "the global cause must stay reachable through fmt wrapping")
+
+       // Node-error form (cee only, no global err): Error() reports the node 
count
+       // and there is no global cause to unwrap.
+       nodeErr := newNodeReplayError(map[string]*common.Error{
+               "node-1": common.NewError("boom"),
+               "node-2": common.NewError("boom"),
+       }, nil)
+       require.EqualError(t, nodeErr, "2 node error(s)")
+       require.NotErrorIs(t, nodeErr, cause, "a node-only error has no global 
cause to unwrap")
+}
+
+// TestBatchSender_CloseSurfacesDrainedFailure covers close()'s failure branch:
+// when an in-flight batch failed, close() drains it and returns that batch's
+// per-node errors instead of the idle publisher's clean result.
+func TestBatchSender_CloseSurfacesDrainedFailure(t *testing.T) {
+       failing := &countingBatchPublisher{closeCee: 
map[string]*common.Error{"node-9": common.NewError("boom")}}
+       s, _ := senderReturning(t, failing)    // the first publisher carries 
the batch and fails
+       enqueueN(t, s, measureReplayBatchSize) // one full batch sent, left 
in-flight (undrained)
+       cee, err := s.close()                  // close() must drain it and 
surface the node error
+       require.NoError(t, err, "a node-only failure carries no global error")
+       require.Contains(t, cee, "node-9", "close must return the drained 
batch's per-node errors")
+}
+
+// TestReplay_SurfacesSynchronousPublishErrorPromptly pins that a synchronous
+// Publish failure aborts the part on the flush that triggered it — not 
deferred
+// until the in-flight bound or the final drain. With the failing publisher
+// carrying the first batch, replay must stop right after that batch instead of
+// scanning ~depth more batches: the old code returned the error only via the
+// bound, so it would have consumed ~3 batches before aborting.
+func TestReplay_SurfacesSynchronousPublishErrorPromptly(t *testing.T) {
+       pub := &countingBatchPublisher{publishErr: fmt.Errorf("stream send 
broke")}
+       s, _ := senderReturning(t, pub) // the first batch's Publish fails 
synchronously
+       var counter uint64
+       rows, err := s.replay(context.TODO(), 
logger.GetLogger("test-replayer"), "g", "p", &counter,
+               &fakeCursor{n: 3 * measureReplayBatchSize}, cleanEmit(s))
+       require.ErrorContains(t, err, "stream send broke", "a synchronous 
Publish error must abort the part")
+       require.Zero(t, counter, "a failed part must not be counted")
+       // Row 2000's emit triggers the failing flush and returns its error, so 
the loop
+       // breaks before counting it: exactly one batch worth minus the failing 
row, far
+       // short of the 3 batches the deferred (bound-only) behavior would have 
scanned.
+       require.Equal(t, measureReplayBatchSize-1, rows, "replay must abort on 
the failing flush, not keep scanning")
+       require.Nil(t, s.take(), "the residual must be discarded")
+       _, cerr := s.close()
+       require.NoError(t, cerr)
+}
diff --git a/banyand/backup/lifecycle/row_replay_trace.go 
b/banyand/backup/lifecycle/row_replay_trace.go
index 4d4f85704..937eb012d 100644
--- a/banyand/backup/lifecycle/row_replay_trace.go
+++ b/banyand/backup/lifecycle/row_replay_trace.go
@@ -22,8 +22,6 @@ import (
        "fmt"
        "path/filepath"
        "strconv"
-       "sync"
-       "sync/atomic"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -53,16 +51,13 @@ const (
 // returns an error.
 type traceRowReplayer struct {
        selector       node.Selector
-       client         queue.Client
-       publisher      queue.BatchPublisher
+       sender         *batchSender
        fs             fs.FileSystem
        logger         *logger.Logger
        schema         *databasev1.Trace
        traceName      string
        counter        *uint64
        group          string
-       batch          []bus.Message
-       batchMu        sync.Mutex
        targetShardNum uint32
 }
 
@@ -88,42 +83,18 @@ func newTraceRowReplayer(
                group:          group,
                targetShardNum: targetShardNum,
                selector:       selector,
-               client:         client,
-               publisher:      
client.NewBatchPublisher(traceReplayBatchTimeout),
+               sender:         newBatchSender(client, data.TopicTraceWrite, 
traceReplayBatchSize, traceReplayBatchTimeout),
                fs:             fileSystem,
                logger:         l,
                schema:         t,
                traceName:      t.Metadata.Name,
                counter:        counter,
-               batch:          make([]bus.Message, 0, traceReplayBatchSize),
        }, nil
 }
 
-// Close flushes pending messages and closes the publisher.
+// Close drains any outstanding batch confirmations and closes the publisher.
 func (r *traceRowReplayer) Close() (map[string]*common.Error, error) {
-       flushCtx, cancel := context.WithTimeout(context.Background(), 
traceReplayBatchTimeout)
-       defer cancel()
-       flushErr := r.flushBatch(flushCtx)
-       cee, closeErr := r.publisher.Close()
-       if flushErr != nil {
-               return cee, flushErr
-       }
-       return cee, closeErr
-}
-
-// flushAndConfirm flushes the buffered batch, closes the publisher to obtain 
the
-// per-node delivery result for everything sent since the last call, then 
opens a
-// fresh publisher for subsequent shards. The batch publisher is 
client-streaming,
-// so per-node errors are only observable once its stream closes; this lets a
-// row-replayed shard be confirmed durable before it is marked completed.
-func (r *traceRowReplayer) flushAndConfirm(ctx context.Context) 
(map[string]*common.Error, error) {
-       flushErr := r.flushBatch(ctx)
-       cee, closeErr := r.publisher.Close()
-       r.publisher = r.client.NewBatchPublisher(traceReplayBatchTimeout)
-       if flushErr != nil {
-               return cee, flushErr
-       }
-       return cee, closeErr
+       return r.sender.close()
 }
 
 func (r *traceRowReplayer) replayPart(ctx context.Context, partPath string) 
(int, error) {
@@ -141,40 +112,10 @@ func (r *traceRowReplayer) replayPart(ctx 
context.Context, partPath string) (int
 
        it := reader.Iterator()
        defer it.Close()
-
-       rowCount := 0
-       for it.Next() {
-               row := it.Row()
-               if rowErr := r.publishRow(ctx, row); rowErr != nil {
-                       pos := it.Position()
-                       r.logger.Warn().Err(rowErr).
-                               Str("group", r.group).
-                               Str("trace", r.traceName).
-                               Str("part", partPath).
-                               Int("block_idx", pos.BlockIdx).
-                               Int("row_idx", pos.RowIdx).
-                               Int("rows_published", rowCount).
-                               Msg("trace row-replay aborted mid-part on 
publish error; will retry on resume")
-                       return rowCount, rowErr
-               }
-               rowCount++
-       }
-       if iterErr := it.Err(); iterErr != nil {
-               pos := it.Position()
-               r.logger.Warn().Err(iterErr).
-                       Str("group", r.group).
-                       Str("trace", r.traceName).
-                       Str("part", partPath).
-                       Int("block_idx", pos.BlockIdx).
-                       Int("row_idx", pos.RowIdx).
-                       Int("rows_published", rowCount).
-                       Msg("trace row-replay aborted mid-part; will retry on 
resume")
-               return rowCount, iterErr
-       }
-       if r.counter != nil {
-               atomic.AddUint64(r.counter, 1)
-       }
-       return rowCount, nil
+       // Prefix the part label with the trace name so the shared driver's 
timing and
+       // abort logs keep the trace identity the old per-type logs carried.
+       return r.sender.replay(ctx, r.logger, r.group, 
r.traceName+"/"+partPath, r.counter, it,
+               func() error { return r.publishRow(ctx, it.Row()) })
 }
 
 // buildWriteRequest reconstructs the WriteRequest + InternalWriteRequest pair
@@ -204,30 +145,5 @@ func (r *traceRowReplayer) publishRow(ctx context.Context, 
row dumptrace.Row) er
                return fmt.Errorf("pick target node for trace %s: %w", 
r.traceName, err)
        }
        msg := 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
-       return r.enqueue(ctx, msg)
-}
-
-// enqueue appends a message to the batch and flushes when full.
-func (r *traceRowReplayer) enqueue(ctx context.Context, msg bus.Message) error 
{
-       r.batchMu.Lock()
-       r.batch = append(r.batch, msg)
-       shouldFlush := len(r.batch) >= traceReplayBatchSize
-       r.batchMu.Unlock()
-       if shouldFlush {
-               return r.flushBatch(ctx)
-       }
-       return nil
-}
-
-func (r *traceRowReplayer) flushBatch(ctx context.Context) error {
-       r.batchMu.Lock()
-       if len(r.batch) == 0 {
-               r.batchMu.Unlock()
-               return nil
-       }
-       pending := r.batch
-       r.batch = make([]bus.Message, 0, traceReplayBatchSize)
-       r.batchMu.Unlock()
-       _, err := r.publisher.Publish(ctx, data.TopicTraceWrite, pending...)
-       return err
+       return r.sender.enqueue(ctx, msg)
 }
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go 
b/banyand/backup/lifecycle/stream_migration_visitor.go
index a229ae076..c0aa6f69c 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -382,30 +382,18 @@ func (mv *streamMigrationVisitor) visitPartRowReplay(ctx 
context.Context, segmen
                Int("target_segments_count", len(targetSegments)).
                Str("group", mv.group).
                Msg("stream part spans multiple target segments; switching to 
row-replay")
+       // replayPart sends and confirms the part's rows through the bounded 
pipeline,
+       // draining all in-flight confirmations before returning. A non-nil 
error means
+       // some rows were not durably delivered; marking the part errored 
(rather than
+       // completed) ensures the resume guard re-replays the whole part.
        rowCount, err := replayer.replayPart(ctx, partPath)
        if err != nil {
                // Row-replay is all-or-nothing per part; mark the source part 
errored so
                // resume retries the whole part (same source key the guard 
checks above).
+               recordReplayNodeErrors(mv.progress, mv.group, err)
                mv.progress.MarkStreamPartError(mv.group, sourceSegmentIDStr, 
sourceShardID, partID, err.Error())
                return fmt.Errorf("row-replay stream part %s: %w", partPath, 
err)
        }
-       // Confirm this part's rows reached every node before marking it 
completed.
-       // replayPart only enqueues; the batch publisher is client-streaming so
-       // per-node errors surface only when its stream closes, so 
flushAndConfirm
-       // closes the publisher to collect that result (then opens a fresh one 
for the
-       // next part). Marking before this confirmation could report success 
for rows
-       // a flush failure never delivered, and the resume guard would then 
skip the
-       // part, losing data.
-       cee, flushErr := replayer.flushAndConfirm(ctx)
-       if flushErr != nil || len(cee) > 0 {
-               mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
-               confirmErr := flushErr
-               if confirmErr == nil {
-                       confirmErr = fmt.Errorf("%d node error(s)", len(cee))
-               }
-               mv.progress.MarkStreamPartError(mv.group, sourceSegmentIDStr, 
sourceShardID, partID, confirmErr.Error())
-               return fmt.Errorf("confirm row-replay stream part %s: %w", 
partPath, confirmErr)
-       }
        mv.progress.MarkStreamPartCompleted(mv.group, sourceSegmentIDStr, 
sourceShardID, partID)
        mv.progress.MarkSourceStreamPartCompleted(mv.group, partPath, 
sourceShardID, partID)
        mv.progress.AddStreamRowReplay(mv.group, rowCount)
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go 
b/banyand/backup/lifecycle/trace_migration_visitor.go
index b033b4d42..933ffe4e1 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -385,32 +385,20 @@ func (mv *traceMigrationVisitor) visitShardRowReplay(ctx 
context.Context, source
                        continue
                }
                partPath := filepath.Join(shardPath, name)
-               rowCount, replayErr := replayer.replayPart(ctx, partPath)
-               if replayErr != nil {
-                       mv.progress.MarkTraceShardError(mv.group, segmentIDStr, 
sourceShardID, replayErr.Error())
-                       return fmt.Errorf("row-replay trace part %s: %w", 
partPath, replayErr)
+               // replayPart sends and confirms the part's rows through the 
bounded pipeline,
+               // draining all in-flight confirmations before returning. A 
non-nil error
+               // marks the whole shard errored so resume re-replays it, never 
skipping a
+               // shard whose rows were not durably delivered.
+               rowCount, partErr := replayer.replayPart(ctx, partPath)
+               if partErr != nil {
+                       recordReplayNodeErrors(mv.progress, mv.group, partErr)
+                       mv.progress.MarkTraceShardError(mv.group, segmentIDStr, 
sourceShardID, partErr.Error())
+                       return fmt.Errorf("row-replay trace part %s: %w", 
partPath, partErr)
                }
                totalRows += rowCount
                partsReplayed++
        }
 
-       // Confirm this shard's rows reached every node before marking it 
completed.
-       // replayPart only enqueues; the batch publisher is client-streaming so
-       // per-node errors surface only when its stream closes, so 
flushAndConfirm
-       // closes the publisher to collect that result (then opens a fresh one 
for the
-       // next shard). Marking before this confirmation could report success 
for rows
-       // a flush failure never delivered, and the resume guard would then 
skip the
-       // whole shard, losing data.
-       cee, flushErr := replayer.flushAndConfirm(ctx)
-       if flushErr != nil || len(cee) > 0 {
-               mv.progress.RecordRowReplayNodeErrors(mv.group, cee)
-               confirmErr := flushErr
-               if confirmErr == nil {
-                       confirmErr = fmt.Errorf("%d node error(s)", len(cee))
-               }
-               mv.progress.MarkTraceShardError(mv.group, segmentIDStr, 
sourceShardID, confirmErr.Error())
-               return fmt.Errorf("confirm row-replay trace shard %s: %w", 
shardPath, confirmErr)
-       }
        mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID)
        mv.progress.MarkSourceTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID)
        mv.progress.AddTraceRowReplay(mv.group, partsReplayed, totalRows)

Reply via email to