mrproliu commented on code in PR #1154: URL: https://github.com/apache/skywalking-banyandb/pull/1154#discussion_r3354194028
########## banyand/backup/lifecycle/row_replay_pipeline.go: ########## @@ -0,0 +1,357 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/dump" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// rowReplayMaxInflight bounds how many row-replay batches may be sent but not +// yet confirmed at once. A depth of 2 lets one batch's receiver-side write +// overlap with the building and sending of the next batch (double buffering) +// while capping the extra sender/receiver memory and write contention. +const rowReplayMaxInflight = 2 + +// nodeReplayError reports a failed row-replay batch. It distinguishes a global +// send/confirm failure (err: the whole publish/close failed, or a row could not +// be built or routed) from per-node delivery failures (cee: the receiver +// rejected rows on specific nodes, keyed by node). At least one is set whenever +// this error is returned; newNodeReplayError yields nil when neither is. +type nodeReplayError struct { + err error + cee map[string]*common.Error +} + +func (e *nodeReplayError) Error() string { + if e.err != nil { + return e.err.Error() + } + return fmt.Sprintf("%d node error(s)", len(e.cee)) +} + +func (e *nodeReplayError) Unwrap() error { + return e.err +} + +// newNodeReplayError reduces a confirmed batch's result to an error: nil when +// the batch fully succeeded (no global error and no per-node errors), otherwise +// a *nodeReplayError carrying whichever failures occurred. +func newNodeReplayError(cee map[string]*common.Error, err error) error { + if err == nil && len(cee) == 0 { + return nil + } + return &nodeReplayError{cee: cee, err: err} +} + +// recordReplayNodeErrors records any per-node delivery errors carried by err +// into progress so they surface in the migration report. It is a no-op for a +// global-only error. +func recordReplayNodeErrors(progress *Progress, group string, err error) { + var nre *nodeReplayError + if errors.As(err, &nre) && len(nre.cee) > 0 { + progress.RecordRowReplayNodeErrors(group, nre.cee) + } +} + +// confirmPipeline bounds the number of in-flight (sent but unconfirmed) +// row-replay batches. Each batch is confirmed on its own client-streaming +// publisher in a background goroutine that delivers its error (nil on success) +// on a channel; the pipeline keeps those channels in FIFO order so +// confirmations are observed in send order. It is driven by a single goroutine +// (the part's replay loop) and needs no internal locking. +type confirmPipeline struct { + inflight []chan error + depth int + // waited accumulates, over the pipeline's lifetime, the wall time the driving + // goroutine spent blocked on confirmations (the in-flight bound in add plus + // drain). Since that goroutine is either building or blocked here, it measures + // how long the sender was starved waiting on the receiver. + waited time.Duration +} + +func newConfirmPipeline() *confirmPipeline { + return &confirmPipeline{depth: rowReplayMaxInflight} +} + +// add registers a batch confirmation that will be delivered on ch. When the +// pipeline is already full it first waits for the oldest in-flight confirmation +// so no more than depth batches are ever outstanding, returning that awaited +// error (a non-nil error means an earlier batch must abort the part). +func (p *confirmPipeline) add(ch chan error) error { + var awaited error + if len(p.inflight) >= p.depth { + start := time.Now() + awaited = <-p.inflight[0] + p.waited += time.Since(start) + p.inflight = p.inflight[1:] + } + p.inflight = append(p.inflight, ch) + return awaited +} + +// drain waits for every remaining in-flight confirmation and returns the first +// failing error (or nil when all succeeded). It is safe to call more than once; +// a drained pipeline is empty. +func (p *confirmPipeline) drain() error { + var first error + for _, ch := range p.inflight { + start := time.Now() + out := <-ch + p.waited += time.Since(start) + if first == nil && out != nil { + first = out + } + } + p.inflight = nil + return first +} + +// batchSender buffers row-replay messages and, once a batch fills, sends it on a +// client-streaming publisher and confirms it asynchronously through a bounded +// confirmPipeline. Each batch is published on its own publisher (rotated in +// immediately) and confirmed in the background, so one batch's receiver-side +// write overlaps with building and sending the next. It owns the whole publisher +// lifecycle (open, rotate, close) so the per-data-type replayers only build +// messages and enqueue them. batchSize and timeout are supplied by each data +// type. A sender is driven by a single goroutine; only the buffer is +// mutex-guarded, while each background confirmation touches just its own +// captured publisher and channel. +type batchSender struct { + client queue.Client + publisher queue.BatchPublisher + pipeline *confirmPipeline + batch []bus.Message + topic bus.Topic + timeout time.Duration + batchSize int + mu sync.Mutex Review Comment: removed mutex. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
