hanahmily commented on code in PR #1154: URL: https://github.com/apache/skywalking-banyandb/pull/1154#discussion_r3353615103
########## banyand/backup/lifecycle/row_replay_pipeline.go: ########## @@ -0,0 +1,357 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/dump" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// rowReplayMaxInflight bounds how many row-replay batches may be sent but not +// yet confirmed at once. A depth of 2 lets one batch's receiver-side write +// overlap with the building and sending of the next batch (double buffering) +// while capping the extra sender/receiver memory and write contention. +const rowReplayMaxInflight = 2 + +// nodeReplayError reports a failed row-replay batch. It distinguishes a global +// send/confirm failure (err: the whole publish/close failed, or a row could not +// be built or routed) from per-node delivery failures (cee: the receiver +// rejected rows on specific nodes, keyed by node). At least one is set whenever +// this error is returned; newNodeReplayError yields nil when neither is. +type nodeReplayError struct { + err error + cee map[string]*common.Error +} + +func (e *nodeReplayError) Error() string { + if e.err != nil { + return e.err.Error() + } + return fmt.Sprintf("%d node error(s)", len(e.cee)) +} + +func (e *nodeReplayError) Unwrap() error { + return e.err +} + +// newNodeReplayError reduces a confirmed batch's result to an error: nil when +// the batch fully succeeded (no global error and no per-node errors), otherwise +// a *nodeReplayError carrying whichever failures occurred. +func newNodeReplayError(cee map[string]*common.Error, err error) error { + if err == nil && len(cee) == 0 { + return nil + } + return &nodeReplayError{cee: cee, err: err} +} + +// recordReplayNodeErrors records any per-node delivery errors carried by err +// into progress so they surface in the migration report. It is a no-op for a +// global-only error. +func recordReplayNodeErrors(progress *Progress, group string, err error) { + var nre *nodeReplayError + if errors.As(err, &nre) && len(nre.cee) > 0 { + progress.RecordRowReplayNodeErrors(group, nre.cee) + } +} + +// confirmPipeline bounds the number of in-flight (sent but unconfirmed) +// row-replay batches. Each batch is confirmed on its own client-streaming +// publisher in a background goroutine that delivers its error (nil on success) +// on a channel; the pipeline keeps those channels in FIFO order so +// confirmations are observed in send order. It is driven by a single goroutine +// (the part's replay loop) and needs no internal locking. +type confirmPipeline struct { + inflight []chan error + depth int + // waited accumulates, over the pipeline's lifetime, the wall time the driving + // goroutine spent blocked on confirmations (the in-flight bound in add plus + // drain). Since that goroutine is either building or blocked here, it measures + // how long the sender was starved waiting on the receiver. + waited time.Duration +} + +func newConfirmPipeline() *confirmPipeline { + return &confirmPipeline{depth: rowReplayMaxInflight} +} + +// add registers a batch confirmation that will be delivered on ch. When the +// pipeline is already full it first waits for the oldest in-flight confirmation +// so no more than depth batches are ever outstanding, returning that awaited +// error (a non-nil error means an earlier batch must abort the part). +func (p *confirmPipeline) add(ch chan error) error { + var awaited error + if len(p.inflight) >= p.depth { + start := time.Now() + awaited = <-p.inflight[0] + p.waited += time.Since(start) + p.inflight = p.inflight[1:] + } + p.inflight = append(p.inflight, ch) + return awaited +} + +// drain waits for every remaining in-flight confirmation and returns the first +// failing error (or nil when all succeeded). It is safe to call more than once; +// a drained pipeline is empty. +func (p *confirmPipeline) drain() error { + var first error + for _, ch := range p.inflight { + start := time.Now() + out := <-ch + p.waited += time.Since(start) + if first == nil && out != nil { + first = out + } + } + p.inflight = nil + return first +} + +// batchSender buffers row-replay messages and, once a batch fills, sends it on a +// client-streaming publisher and confirms it asynchronously through a bounded +// confirmPipeline. Each batch is published on its own publisher (rotated in +// immediately) and confirmed in the background, so one batch's receiver-side +// write overlaps with building and sending the next. It owns the whole publisher +// lifecycle (open, rotate, close) so the per-data-type replayers only build +// messages and enqueue them. batchSize and timeout are supplied by each data +// type. A sender is driven by a single goroutine; only the buffer is +// mutex-guarded, while each background confirmation touches just its own +// captured publisher and channel. +type batchSender struct { + client queue.Client + publisher queue.BatchPublisher + pipeline *confirmPipeline + batch []bus.Message + topic bus.Topic + timeout time.Duration + batchSize int + mu sync.Mutex +} + +// newBatchSender builds a sender for one data type. batchSize and timeout are +// per-type knobs each replayer (measure/stream/trace) supplies from its own +// constants; they coincide at 2000/30s today, which is why unparam is silenced. +// +//nolint:unparam Review Comment: Suppressing `unparam` because all three callers today pass the same `batchSize`/`timeout` constants is a code smell. If the parameters are truly always the same, make them constants inside `newBatchSender` and drop the parameters entirely. If the intent is to support different values per type in the future, a brief `// reserved for per-type tuning` comment is clearer than a linter suppression that silently hides the fact. ########## banyand/backup/lifecycle/row_replay_pipeline.go: ########## @@ -0,0 +1,357 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/dump" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// rowReplayMaxInflight bounds how many row-replay batches may be sent but not +// yet confirmed at once. A depth of 2 lets one batch's receiver-side write +// overlap with the building and sending of the next batch (double buffering) +// while capping the extra sender/receiver memory and write contention. +const rowReplayMaxInflight = 2 + +// nodeReplayError reports a failed row-replay batch. It distinguishes a global +// send/confirm failure (err: the whole publish/close failed, or a row could not +// be built or routed) from per-node delivery failures (cee: the receiver +// rejected rows on specific nodes, keyed by node). At least one is set whenever +// this error is returned; newNodeReplayError yields nil when neither is. +type nodeReplayError struct { + err error + cee map[string]*common.Error +} + +func (e *nodeReplayError) Error() string { + if e.err != nil { + return e.err.Error() + } + return fmt.Sprintf("%d node error(s)", len(e.cee)) +} + +func (e *nodeReplayError) Unwrap() error { + return e.err +} + +// newNodeReplayError reduces a confirmed batch's result to an error: nil when +// the batch fully succeeded (no global error and no per-node errors), otherwise +// a *nodeReplayError carrying whichever failures occurred. +func newNodeReplayError(cee map[string]*common.Error, err error) error { + if err == nil && len(cee) == 0 { + return nil + } + return &nodeReplayError{cee: cee, err: err} +} + +// recordReplayNodeErrors records any per-node delivery errors carried by err +// into progress so they surface in the migration report. It is a no-op for a +// global-only error. +func recordReplayNodeErrors(progress *Progress, group string, err error) { + var nre *nodeReplayError + if errors.As(err, &nre) && len(nre.cee) > 0 { + progress.RecordRowReplayNodeErrors(group, nre.cee) + } +} + +// confirmPipeline bounds the number of in-flight (sent but unconfirmed) +// row-replay batches. Each batch is confirmed on its own client-streaming +// publisher in a background goroutine that delivers its error (nil on success) +// on a channel; the pipeline keeps those channels in FIFO order so +// confirmations are observed in send order. It is driven by a single goroutine +// (the part's replay loop) and needs no internal locking. +type confirmPipeline struct { + inflight []chan error + depth int + // waited accumulates, over the pipeline's lifetime, the wall time the driving + // goroutine spent blocked on confirmations (the in-flight bound in add plus + // drain). Since that goroutine is either building or blocked here, it measures + // how long the sender was starved waiting on the receiver. + waited time.Duration +} + +func newConfirmPipeline() *confirmPipeline { + return &confirmPipeline{depth: rowReplayMaxInflight} +} + +// add registers a batch confirmation that will be delivered on ch. When the +// pipeline is already full it first waits for the oldest in-flight confirmation +// so no more than depth batches are ever outstanding, returning that awaited +// error (a non-nil error means an earlier batch must abort the part). +func (p *confirmPipeline) add(ch chan error) error { + var awaited error + if len(p.inflight) >= p.depth { + start := time.Now() + awaited = <-p.inflight[0] + p.waited += time.Since(start) + p.inflight = p.inflight[1:] + } + p.inflight = append(p.inflight, ch) + return awaited +} + +// drain waits for every remaining in-flight confirmation and returns the first +// failing error (or nil when all succeeded). It is safe to call more than once; +// a drained pipeline is empty. +func (p *confirmPipeline) drain() error { + var first error + for _, ch := range p.inflight { + start := time.Now() + out := <-ch + p.waited += time.Since(start) + if first == nil && out != nil { + first = out + } + } + p.inflight = nil + return first +} + +// batchSender buffers row-replay messages and, once a batch fills, sends it on a +// client-streaming publisher and confirms it asynchronously through a bounded +// confirmPipeline. Each batch is published on its own publisher (rotated in +// immediately) and confirmed in the background, so one batch's receiver-side +// write overlaps with building and sending the next. It owns the whole publisher +// lifecycle (open, rotate, close) so the per-data-type replayers only build +// messages and enqueue them. batchSize and timeout are supplied by each data +// type. A sender is driven by a single goroutine; only the buffer is +// mutex-guarded, while each background confirmation touches just its own +// captured publisher and channel. +type batchSender struct { + client queue.Client + publisher queue.BatchPublisher + pipeline *confirmPipeline + batch []bus.Message + topic bus.Topic + timeout time.Duration + batchSize int + mu sync.Mutex Review Comment: `batchSender` is documented as "driven by a single goroutine" yet carries a `sync.Mutex`. The replay loop calls `take()`/`enqueue()` exclusively, and background confirm goroutines only touch their own captured `pub`/`ch` — they never access `s.batch`. The mutex is not protecting against any actual concurrent access. Either remove it (and `take()`'s lock/unlock), or add a comment explaining what concurrent scenario it guards against. ########## banyand/backup/lifecycle/row_replay_pipeline.go: ########## @@ -0,0 +1,357 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/dump" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// rowReplayMaxInflight bounds how many row-replay batches may be sent but not +// yet confirmed at once. A depth of 2 lets one batch's receiver-side write +// overlap with the building and sending of the next batch (double buffering) +// while capping the extra sender/receiver memory and write contention. +const rowReplayMaxInflight = 2 + +// nodeReplayError reports a failed row-replay batch. It distinguishes a global +// send/confirm failure (err: the whole publish/close failed, or a row could not +// be built or routed) from per-node delivery failures (cee: the receiver +// rejected rows on specific nodes, keyed by node). At least one is set whenever +// this error is returned; newNodeReplayError yields nil when neither is. +type nodeReplayError struct { + err error + cee map[string]*common.Error +} + +func (e *nodeReplayError) Error() string { + if e.err != nil { + return e.err.Error() + } + return fmt.Sprintf("%d node error(s)", len(e.cee)) +} + +func (e *nodeReplayError) Unwrap() error { + return e.err +} + +// newNodeReplayError reduces a confirmed batch's result to an error: nil when +// the batch fully succeeded (no global error and no per-node errors), otherwise +// a *nodeReplayError carrying whichever failures occurred. +func newNodeReplayError(cee map[string]*common.Error, err error) error { + if err == nil && len(cee) == 0 { + return nil + } + return &nodeReplayError{cee: cee, err: err} +} + +// recordReplayNodeErrors records any per-node delivery errors carried by err +// into progress so they surface in the migration report. It is a no-op for a +// global-only error. +func recordReplayNodeErrors(progress *Progress, group string, err error) { + var nre *nodeReplayError + if errors.As(err, &nre) && len(nre.cee) > 0 { + progress.RecordRowReplayNodeErrors(group, nre.cee) + } +} + +// confirmPipeline bounds the number of in-flight (sent but unconfirmed) +// row-replay batches. Each batch is confirmed on its own client-streaming +// publisher in a background goroutine that delivers its error (nil on success) +// on a channel; the pipeline keeps those channels in FIFO order so +// confirmations are observed in send order. It is driven by a single goroutine +// (the part's replay loop) and needs no internal locking. +type confirmPipeline struct { + inflight []chan error + depth int + // waited accumulates, over the pipeline's lifetime, the wall time the driving + // goroutine spent blocked on confirmations (the in-flight bound in add plus + // drain). Since that goroutine is either building or blocked here, it measures + // how long the sender was starved waiting on the receiver. + waited time.Duration +} + +func newConfirmPipeline() *confirmPipeline { + return &confirmPipeline{depth: rowReplayMaxInflight} +} + +// add registers a batch confirmation that will be delivered on ch. When the +// pipeline is already full it first waits for the oldest in-flight confirmation +// so no more than depth batches are ever outstanding, returning that awaited +// error (a non-nil error means an earlier batch must abort the part). +func (p *confirmPipeline) add(ch chan error) error { + var awaited error + if len(p.inflight) >= p.depth { + start := time.Now() + awaited = <-p.inflight[0] + p.waited += time.Since(start) + p.inflight = p.inflight[1:] + } + p.inflight = append(p.inflight, ch) + return awaited +} + +// drain waits for every remaining in-flight confirmation and returns the first +// failing error (or nil when all succeeded). It is safe to call more than once; +// a drained pipeline is empty. +func (p *confirmPipeline) drain() error { + var first error + for _, ch := range p.inflight { + start := time.Now() + out := <-ch + p.waited += time.Since(start) + if first == nil && out != nil { + first = out + } + } + p.inflight = nil + return first +} + +// batchSender buffers row-replay messages and, once a batch fills, sends it on a +// client-streaming publisher and confirms it asynchronously through a bounded +// confirmPipeline. Each batch is published on its own publisher (rotated in +// immediately) and confirmed in the background, so one batch's receiver-side +// write overlaps with building and sending the next. It owns the whole publisher +// lifecycle (open, rotate, close) so the per-data-type replayers only build +// messages and enqueue them. batchSize and timeout are supplied by each data +// type. A sender is driven by a single goroutine; only the buffer is +// mutex-guarded, while each background confirmation touches just its own +// captured publisher and channel. +type batchSender struct { + client queue.Client + publisher queue.BatchPublisher + pipeline *confirmPipeline + batch []bus.Message + topic bus.Topic + timeout time.Duration + batchSize int + mu sync.Mutex +} + +// newBatchSender builds a sender for one data type. batchSize and timeout are +// per-type knobs each replayer (measure/stream/trace) supplies from its own +// constants; they coincide at 2000/30s today, which is why unparam is silenced. +// +//nolint:unparam +func newBatchSender(client queue.Client, topic bus.Topic, batchSize int, timeout time.Duration) *batchSender { + return &batchSender{ + client: client, + publisher: client.NewBatchPublisher(timeout), + pipeline: newConfirmPipeline(), + topic: topic, + batch: make([]bus.Message, 0, batchSize), + timeout: timeout, + batchSize: batchSize, + } +} + +// enqueue buffers msg and, when the batch is full, sends and asynchronously +// confirms it. The returned error is non-nil only when the in-flight bound +// forced it to wait on (and surface) an earlier batch's failure. +func (s *batchSender) enqueue(ctx context.Context, msg bus.Message) error { + s.mu.Lock() + s.batch = append(s.batch, msg) + full := len(s.batch) >= s.batchSize + s.mu.Unlock() + if full { + return s.flush(ctx) + } + return nil +} + +// flush sends the buffered batch on the current publisher, rotates in a fresh +// publisher for the next batch, and confirms the sent batch (closing its stream +// to collect the per-node delivery result) in the background. The returned error +// aborts the part promptly: it is this batch's synchronous send error, or — when +// the in-flight bound made it wait — an earlier batch's surfaced result (which +// takes priority, preserving send order). It is a no-op when the buffer is empty. +func (s *batchSender) flush(ctx context.Context) error { + pending := s.take() + if len(pending) == 0 { + return nil + } + _, pubErr := s.publisher.Publish(ctx, s.topic, pending...) + pub := s.publisher + s.publisher = s.client.NewBatchPublisher(s.timeout) + ch := make(chan error, 1) + go func() { + // The pipeline blocks in add/drain until exactly one value arrives on ch, so + // the send must happen on every path: a deferred send (with recover) + // guarantees it even if pub.Close panics, turning a would-be deadlock into a + // surfaced error. + var out error + defer func() { + if rec := recover(); rec != nil { + out = fmt.Errorf("row-replay batch confirm panicked: %v", rec) + } + ch <- out + }() + cee, closeErr := pub.Close() + err := pubErr + if err == nil { + err = closeErr + } + out = newNodeReplayError(cee, err) + }() + // Abort promptly: an earlier in-flight batch's failure (via the bound) wins, + // otherwise this batch's own synchronous send error. + if awaited := s.pipeline.add(ch); awaited != nil { Review Comment: When `Publish` fails (`pubErr != nil`), the receiver never saw the batch, so `cee` from the subsequent `pub.Close()` is meaningless — the close-side per-node errors reflect rows the receiver processed, which is none here. Passing a potentially non-nil `cee` alongside `pubErr` to `newNodeReplayError` can produce a confusing error that reports both a global send failure and node-specific rejections. Consider zeroing `cee` when `pubErr != nil`: ```go err := pubErr if err == nil { err = closeErr } else { cee = nil // publish failed — receiver saw nothing, per-node cee is noise } out = newNodeReplayError(cee, err) ``` ########## banyand/backup/lifecycle/row_replay_trace.go: ########## @@ -141,40 +112,8 @@ func (r *traceRowReplayer) replayPart(ctx context.Context, partPath string) (int it := reader.Iterator() defer it.Close() - - rowCount := 0 - for it.Next() { - row := it.Row() - if rowErr := r.publishRow(ctx, row); rowErr != nil { - pos := it.Position() - r.logger.Warn().Err(rowErr). - Str("group", r.group). - Str("trace", r.traceName). - Str("part", partPath). - Int("block_idx", pos.BlockIdx). - Int("row_idx", pos.RowIdx). - Int("rows_published", rowCount). - Msg("trace row-replay aborted mid-part on publish error; will retry on resume") - return rowCount, rowErr - } - rowCount++ - } - if iterErr := it.Err(); iterErr != nil { - pos := it.Position() - r.logger.Warn().Err(iterErr). - Str("group", r.group). - Str("trace", r.traceName). - Str("part", partPath). - Int("block_idx", pos.BlockIdx). - Int("row_idx", pos.RowIdx). - Int("rows_published", rowCount). - Msg("trace row-replay aborted mid-part; will retry on resume") - return rowCount, iterErr - } - if r.counter != nil { - atomic.AddUint64(r.counter, 1) - } - return rowCount, nil + return r.sender.replay(ctx, r.logger, r.group, partPath, r.counter, it, Review Comment: The old per-type abort log included `Str("trace", r.traceName)`. The shared `warnAbort` closure in `batchSender.replay()` only logs `group` and `part`, so this field is silently dropped for trace replays. If monitoring queries or alerts filter on `trace=`, this is a quiet observability regression. Consider embedding the trace name in the `part` label (e.g. `r.traceName+"/"+partPath`) or adding an optional label parameter to `replay()`. ########## banyand/backup/lifecycle/row_replay_test.go: ########## @@ -24,15 +24,18 @@ import ( gofs "io/fs" "path/filepath" "strconv" + "sync/atomic" "testing" "time" "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" Review Comment: Project convention uses explicit aliases only for the project's protobuf packages. `go.uber.org/mock/gomock` is already named `gomock` by its package declaration — the alias is redundant. Use the bare import: ```go "go.uber.org/mock/gomock" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
