mrproliu commented on code in PR #1154:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1154#discussion_r3354197142


##########
banyand/backup/lifecycle/row_replay_pipeline.go:
##########
@@ -0,0 +1,357 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/dump"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// rowReplayMaxInflight bounds how many row-replay batches may be sent but not
+// yet confirmed at once. A depth of 2 lets one batch's receiver-side write
+// overlap with the building and sending of the next batch (double buffering)
+// while capping the extra sender/receiver memory and write contention.
+const rowReplayMaxInflight = 2
+
+// nodeReplayError reports a failed row-replay batch. It distinguishes a global
+// send/confirm failure (err: the whole publish/close failed, or a row could 
not
+// be built or routed) from per-node delivery failures (cee: the receiver
+// rejected rows on specific nodes, keyed by node). At least one is set 
whenever
+// this error is returned; newNodeReplayError yields nil when neither is.
+type nodeReplayError struct {
+       err error
+       cee map[string]*common.Error
+}
+
+func (e *nodeReplayError) Error() string {
+       if e.err != nil {
+               return e.err.Error()
+       }
+       return fmt.Sprintf("%d node error(s)", len(e.cee))
+}
+
+func (e *nodeReplayError) Unwrap() error {
+       return e.err
+}
+
+// newNodeReplayError reduces a confirmed batch's result to an error: nil when
+// the batch fully succeeded (no global error and no per-node errors), 
otherwise
+// a *nodeReplayError carrying whichever failures occurred.
+func newNodeReplayError(cee map[string]*common.Error, err error) error {
+       if err == nil && len(cee) == 0 {
+               return nil
+       }
+       return &nodeReplayError{cee: cee, err: err}
+}
+
+// recordReplayNodeErrors records any per-node delivery errors carried by err
+// into progress so they surface in the migration report. It is a no-op for a
+// global-only error.
+func recordReplayNodeErrors(progress *Progress, group string, err error) {
+       var nre *nodeReplayError
+       if errors.As(err, &nre) && len(nre.cee) > 0 {
+               progress.RecordRowReplayNodeErrors(group, nre.cee)
+       }
+}
+
+// confirmPipeline bounds the number of in-flight (sent but unconfirmed)
+// row-replay batches. Each batch is confirmed on its own client-streaming
+// publisher in a background goroutine that delivers its error (nil on success)
+// on a channel; the pipeline keeps those channels in FIFO order so
+// confirmations are observed in send order. It is driven by a single goroutine
+// (the part's replay loop) and needs no internal locking.
+type confirmPipeline struct {
+       inflight []chan error
+       depth    int
+       // waited accumulates, over the pipeline's lifetime, the wall time the 
driving
+       // goroutine spent blocked on confirmations (the in-flight bound in add 
plus
+       // drain). Since that goroutine is either building or blocked here, it 
measures
+       // how long the sender was starved waiting on the receiver.
+       waited time.Duration
+}
+
+func newConfirmPipeline() *confirmPipeline {
+       return &confirmPipeline{depth: rowReplayMaxInflight}
+}
+
+// add registers a batch confirmation that will be delivered on ch. When the
+// pipeline is already full it first waits for the oldest in-flight 
confirmation
+// so no more than depth batches are ever outstanding, returning that awaited
+// error (a non-nil error means an earlier batch must abort the part).
+func (p *confirmPipeline) add(ch chan error) error {
+       var awaited error
+       if len(p.inflight) >= p.depth {
+               start := time.Now()
+               awaited = <-p.inflight[0]
+               p.waited += time.Since(start)
+               p.inflight = p.inflight[1:]
+       }
+       p.inflight = append(p.inflight, ch)
+       return awaited
+}
+
+// drain waits for every remaining in-flight confirmation and returns the first
+// failing error (or nil when all succeeded). It is safe to call more than 
once;
+// a drained pipeline is empty.
+func (p *confirmPipeline) drain() error {
+       var first error
+       for _, ch := range p.inflight {
+               start := time.Now()
+               out := <-ch
+               p.waited += time.Since(start)
+               if first == nil && out != nil {
+                       first = out
+               }
+       }
+       p.inflight = nil
+       return first
+}
+
+// batchSender buffers row-replay messages and, once a batch fills, sends it 
on a
+// client-streaming publisher and confirms it asynchronously through a bounded
+// confirmPipeline. Each batch is published on its own publisher (rotated in
+// immediately) and confirmed in the background, so one batch's receiver-side
+// write overlaps with building and sending the next. It owns the whole 
publisher
+// lifecycle (open, rotate, close) so the per-data-type replayers only build
+// messages and enqueue them. batchSize and timeout are supplied by each data
+// type. A sender is driven by a single goroutine; only the buffer is
+// mutex-guarded, while each background confirmation touches just its own
+// captured publisher and channel.
+type batchSender struct {
+       client    queue.Client
+       publisher queue.BatchPublisher
+       pipeline  *confirmPipeline
+       batch     []bus.Message
+       topic     bus.Topic
+       timeout   time.Duration
+       batchSize int
+       mu        sync.Mutex
+}
+
+// newBatchSender builds a sender for one data type. batchSize and timeout are
+// per-type knobs each replayer (measure/stream/trace) supplies from its own
+// constants; they coincide at 2000/30s today, which is why unparam is 
silenced.
+//
+//nolint:unparam

Review Comment:
   since the const may changed from each data type, so keep the parameter and 
remove the `nolint` lable. 



##########
banyand/backup/lifecycle/row_replay_pipeline.go:
##########
@@ -0,0 +1,357 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/dump"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// rowReplayMaxInflight bounds how many row-replay batches may be sent but not
+// yet confirmed at once. A depth of 2 lets one batch's receiver-side write
+// overlap with the building and sending of the next batch (double buffering)
+// while capping the extra sender/receiver memory and write contention.
+const rowReplayMaxInflight = 2
+
+// nodeReplayError reports a failed row-replay batch. It distinguishes a global
+// send/confirm failure (err: the whole publish/close failed, or a row could 
not
+// be built or routed) from per-node delivery failures (cee: the receiver
+// rejected rows on specific nodes, keyed by node). At least one is set 
whenever
+// this error is returned; newNodeReplayError yields nil when neither is.
+type nodeReplayError struct {
+       err error
+       cee map[string]*common.Error
+}
+
+func (e *nodeReplayError) Error() string {
+       if e.err != nil {
+               return e.err.Error()
+       }
+       return fmt.Sprintf("%d node error(s)", len(e.cee))
+}
+
+func (e *nodeReplayError) Unwrap() error {
+       return e.err
+}
+
+// newNodeReplayError reduces a confirmed batch's result to an error: nil when
+// the batch fully succeeded (no global error and no per-node errors), 
otherwise
+// a *nodeReplayError carrying whichever failures occurred.
+func newNodeReplayError(cee map[string]*common.Error, err error) error {
+       if err == nil && len(cee) == 0 {
+               return nil
+       }
+       return &nodeReplayError{cee: cee, err: err}
+}
+
+// recordReplayNodeErrors records any per-node delivery errors carried by err
+// into progress so they surface in the migration report. It is a no-op for a
+// global-only error.
+func recordReplayNodeErrors(progress *Progress, group string, err error) {
+       var nre *nodeReplayError
+       if errors.As(err, &nre) && len(nre.cee) > 0 {
+               progress.RecordRowReplayNodeErrors(group, nre.cee)
+       }
+}
+
+// confirmPipeline bounds the number of in-flight (sent but unconfirmed)
+// row-replay batches. Each batch is confirmed on its own client-streaming
+// publisher in a background goroutine that delivers its error (nil on success)
+// on a channel; the pipeline keeps those channels in FIFO order so
+// confirmations are observed in send order. It is driven by a single goroutine
+// (the part's replay loop) and needs no internal locking.
+type confirmPipeline struct {
+       inflight []chan error
+       depth    int
+       // waited accumulates, over the pipeline's lifetime, the wall time the 
driving
+       // goroutine spent blocked on confirmations (the in-flight bound in add 
plus
+       // drain). Since that goroutine is either building or blocked here, it 
measures
+       // how long the sender was starved waiting on the receiver.
+       waited time.Duration
+}
+
+func newConfirmPipeline() *confirmPipeline {
+       return &confirmPipeline{depth: rowReplayMaxInflight}
+}
+
+// add registers a batch confirmation that will be delivered on ch. When the
+// pipeline is already full it first waits for the oldest in-flight 
confirmation
+// so no more than depth batches are ever outstanding, returning that awaited
+// error (a non-nil error means an earlier batch must abort the part).
+func (p *confirmPipeline) add(ch chan error) error {
+       var awaited error
+       if len(p.inflight) >= p.depth {
+               start := time.Now()
+               awaited = <-p.inflight[0]
+               p.waited += time.Since(start)
+               p.inflight = p.inflight[1:]
+       }
+       p.inflight = append(p.inflight, ch)
+       return awaited
+}
+
+// drain waits for every remaining in-flight confirmation and returns the first
+// failing error (or nil when all succeeded). It is safe to call more than 
once;
+// a drained pipeline is empty.
+func (p *confirmPipeline) drain() error {
+       var first error
+       for _, ch := range p.inflight {
+               start := time.Now()
+               out := <-ch
+               p.waited += time.Since(start)
+               if first == nil && out != nil {
+                       first = out
+               }
+       }
+       p.inflight = nil
+       return first
+}
+
+// batchSender buffers row-replay messages and, once a batch fills, sends it 
on a
+// client-streaming publisher and confirms it asynchronously through a bounded
+// confirmPipeline. Each batch is published on its own publisher (rotated in
+// immediately) and confirmed in the background, so one batch's receiver-side
+// write overlaps with building and sending the next. It owns the whole 
publisher
+// lifecycle (open, rotate, close) so the per-data-type replayers only build
+// messages and enqueue them. batchSize and timeout are supplied by each data
+// type. A sender is driven by a single goroutine; only the buffer is
+// mutex-guarded, while each background confirmation touches just its own
+// captured publisher and channel.
+type batchSender struct {
+       client    queue.Client
+       publisher queue.BatchPublisher
+       pipeline  *confirmPipeline
+       batch     []bus.Message
+       topic     bus.Topic
+       timeout   time.Duration
+       batchSize int
+       mu        sync.Mutex
+}
+
+// newBatchSender builds a sender for one data type. batchSize and timeout are
+// per-type knobs each replayer (measure/stream/trace) supplies from its own
+// constants; they coincide at 2000/30s today, which is why unparam is 
silenced.
+//
+//nolint:unparam
+func newBatchSender(client queue.Client, topic bus.Topic, batchSize int, 
timeout time.Duration) *batchSender {
+       return &batchSender{
+               client:    client,
+               publisher: client.NewBatchPublisher(timeout),
+               pipeline:  newConfirmPipeline(),
+               topic:     topic,
+               batch:     make([]bus.Message, 0, batchSize),
+               timeout:   timeout,
+               batchSize: batchSize,
+       }
+}
+
+// enqueue buffers msg and, when the batch is full, sends and asynchronously
+// confirms it. The returned error is non-nil only when the in-flight bound
+// forced it to wait on (and surface) an earlier batch's failure.
+func (s *batchSender) enqueue(ctx context.Context, msg bus.Message) error {
+       s.mu.Lock()
+       s.batch = append(s.batch, msg)
+       full := len(s.batch) >= s.batchSize
+       s.mu.Unlock()
+       if full {
+               return s.flush(ctx)
+       }
+       return nil
+}
+
+// flush sends the buffered batch on the current publisher, rotates in a fresh
+// publisher for the next batch, and confirms the sent batch (closing its 
stream
+// to collect the per-node delivery result) in the background. The returned 
error
+// aborts the part promptly: it is this batch's synchronous send error, or — 
when
+// the in-flight bound made it wait — an earlier batch's surfaced result (which
+// takes priority, preserving send order). It is a no-op when the buffer is 
empty.
+func (s *batchSender) flush(ctx context.Context) error {
+       pending := s.take()
+       if len(pending) == 0 {
+               return nil
+       }
+       _, pubErr := s.publisher.Publish(ctx, s.topic, pending...)
+       pub := s.publisher
+       s.publisher = s.client.NewBatchPublisher(s.timeout)
+       ch := make(chan error, 1)
+       go func() {
+               // The pipeline blocks in add/drain until exactly one value 
arrives on ch, so
+               // the send must happen on every path: a deferred send (with 
recover)
+               // guarantees it even if pub.Close panics, turning a would-be 
deadlock into a
+               // surfaced error.
+               var out error
+               defer func() {
+                       if rec := recover(); rec != nil {
+                               out = fmt.Errorf("row-replay batch confirm 
panicked: %v", rec)
+                       }
+                       ch <- out
+               }()
+               cee, closeErr := pub.Close()
+               err := pubErr
+               if err == nil {
+                       err = closeErr
+               }
+               out = newNodeReplayError(cee, err)
+       }()
+       // Abort promptly: an earlier in-flight batch's failure (via the bound) 
wins,
+       // otherwise this batch's own synchronous send error.
+       if awaited := s.pipeline.add(ch); awaited != nil {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to