Copilot commented on code in PR #815:
URL:
https://github.com/apache/skywalking-banyandb/pull/815#discussion_r2443574184
##########
banyand/internal/sidx/block_scanner.go:
##########
@@ -127,39 +128,44 @@ func (bsn *blockScanner) scan(ctx context.Context,
blockCh chan *blockScanResult
}
p := it.piHeap[0]
- batch.bss = append(batch.bss, blockScanResult{
- p: p.p,
- })
- bs := &batch.bss[len(batch.bss)-1]
- bs.bm.copyFrom(p.curBlock)
+ // Get block size before adding to batch
+ blockSize := p.curBlock.uncompressedSize
+
+ // Check if adding this block would exceed quota
quota := bsn.pm.AvailableBytes()
- for i := range batch.bss {
- totalBlockBytes += batch.bss[i].bm.uncompressedSize
- if quota >= 0 && totalBlockBytes > uint64(quota) {
- err := fmt.Errorf("block scan quota exceeded:
used %d bytes, quota is %d bytes", totalBlockBytes, quota)
- batch.err = err
+ if quota >= 0 && totalBlockBytes+blockSize > uint64(quota) {
+ if len(batch.bss) > 0 {
+ // Send current batch without error
select {
case blockCh <- batch:
case <-ctx.Done():
releaseBlockScanResultBatch(batch)
- bsn.l.Warn().Err(err).Msg("quota
exceeded, context canceled")
}
return
}
+ // Batch is empty, send error
+ err := fmt.Errorf("sidx block scan quota exceeded:
block size %s, quota is %s", humanize.Bytes(blockSize),
humanize.Bytes(uint64(quota)))
+ batch.err = err
+ select {
+ case blockCh <- batch:
+ case <-ctx.Done():
+ releaseBlockScanResultBatch(batch)
+ return
+ }
+ return
}
+ // Quota OK, add block to batch
+ batch.bss = append(batch.bss, blockScanResult{
+ p: p.p,
+ })
+ bs := &batch.bss[len(batch.bss)-1]
+ bs.bm.copyFrom(p.curBlock)
+ totalBlockBytes += blockSize
+
+ // Check if batch is full
if len(batch.bss) >= cap(batch.bss) {
- if err := bsn.pm.AcquireResource(ctx, totalBlockBytes);
err != nil {
- batch.err = fmt.Errorf("cannot acquire
resource: %w", err)
- select {
- case blockCh <- batch:
- case <-ctx.Done():
- releaseBlockScanResultBatch(batch)
- bsn.l.Warn().Err(err).Msg("cannot
acquire resource")
- }
- return
- }
select {
case blockCh <- batch:
case <-ctx.Done():
Review Comment:
Resource reservation with protector is no longer performed; AvailableBytes
check alone doesn't acquire/hold memory budget and can lead to concurrent
oversubscription. Before emitting a full batch, call AcquireResource(ctx,
totalBlockBytes) and propagate any error; ensure corresponding release happens
when the batch is fully consumed.
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -912,76 +744,73 @@ func (t *trace) scanTraceIDs(ctx context.Context, parts
[]*part, qo queryOptions
tstIter.init(bma, parts, sortedIDs)
if initErr := tstIter.Error(); initErr != nil {
- err = fmt.Errorf("cannot init tstIter: %w", initErr)
+ spanErr = fmt.Errorf("cannot init tstIter: %w", initErr)
+ select {
+ case out <- scanCursorResult{err: spanErr}:
+ case <-ctx.Done():
+ }
return
}
- var (
- spanBlockBytes uint64
- hit int
- )
-
quota := t.pm.AvailableBytes()
+ hit := 0
for tstIter.nextBlock() {
if hit%checkDoneEvery == 0 {
select {
case <-ctx.Done():
- err = pkgerrors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
- len(result),
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+ spanErr = pkgerrors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+ cursorCount,
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
return
default:
}
}
hit++
+ // Create block cursor and get size before checking quota
bc := generateBlockCursor()
p := tstIter.piPool[tstIter.idx]
bc.init(p.p, p.curBlock, qo)
- result = append(result, bc)
+ blockSize := bc.bm.uncompressedSpanSizeBytes
+
+ // Check if adding this block would exceed quota
+ if quota >= 0 && spanBlockBytes+blockSize > uint64(quota) {
+ releaseBlockCursor(bc)
+ if cursorCount > 0 {
+ // Have results, return them successfully by
just closing channel
+ return
+ }
+ // No results, send error
+ spanErr = fmt.Errorf("block scan quota exceeded: block
size %d bytes, quota is %d bytes", blockSize, quota)
+ select {
+ case out <- scanCursorResult{err: spanErr}:
+ case <-ctx.Done():
+ }
+ return
+ }
+
+ // Quota OK, send cursor
if recordBlock != nil {
recordBlock(bc)
}
- spanBlockBytes += bc.bm.uncompressedSpanSizeBytes
+ spanBlockBytes += blockSize
+ cursorCount++
- if quota >= 0 && spanBlockBytes > uint64(quota) {
- err = fmt.Errorf("block scan quota exceeded: used %d
bytes, quota is %d bytes", spanBlockBytes, quota)
+ select {
+ case out <- scanCursorResult{cursor: bc}:
+ case <-ctx.Done():
+ releaseBlockCursor(bc)
+ spanErr = pkgerrors.WithMessagef(ctx.Err(), "interrupt:
scanned %d blocks", cursorCount)
return
}
Review Comment:
Resource accounting via protector.AcquireResource was removed; this makes
quota checks best-effort and can race with other scans. After computing
spanBlockBytes for the batch, acquire the resource (e.g., before sending the
first cursor) and propagate errors; otherwise we can exceed the global budget
under concurrency.
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -457,11 +467,6 @@ func (r *sidxStreamRunner) run(out chan<- traceBatch) {
}
}
- if added && r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
- r.limitReached = true
- break
- }
-
if err := r.advanceShard(shard); err != nil {
Review Comment:
The overall maxTraceSize cap is no longer enforced (previous early breaks
are removed), changing behavior from 'limit total trace IDs' to 'limit only
per-batch via MaxBatchSize'. If this is unintended, reintroduce a check like
'if r.maxTraceSize > 0 && r.total >= r.maxTraceSize { return }'; if it is
intended, update tracing/tags and docs to reflect the removal of a global cap.
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -629,261 +668,54 @@ func (t *trace) startBlockScanStage(
parts []*part,
qo queryOptions,
batches <-chan traceBatch,
- maxTraceSize int,
) <-chan *scanBatch {
out := make(chan *scanBatch)
- workerCount := cgroups.CPUs()
- if workerCount < 1 {
- workerCount = 1
- }
-
- jobCh := make(chan traceBatch)
- resultCh := make(chan *scanBatch)
-
- var workerGroup sync.WaitGroup
- workerGroup.Add(workerCount)
-
- t.launchBlockScanWorkers(ctx, parts, qo, workerCount, jobCh, resultCh,
&workerGroup)
-
- go func() {
- workerGroup.Wait()
- close(resultCh)
- }()
-
- go func() {
- t.dispatchTraceBatches(ctx, batches, jobCh, maxTraceSize)
- }()
-
go func() {
defer close(out)
- t.collectScanResults(ctx, resultCh, out)
- }()
-
- return out
-}
-
-func (t *trace) launchBlockScanWorkers(
- ctx context.Context,
- parts []*part,
- qo queryOptions,
- workerCount int,
- jobCh <-chan traceBatch,
- resultCh chan<- *scanBatch,
- wg *sync.WaitGroup,
-) {
- for i := 0; i < workerCount; i++ {
- go func() {
- defer wg.Done()
- for batch := range jobCh {
- sb := t.processTraceBatch(ctx, parts, qo, batch)
- if sb == nil {
- continue
- }
+ for batch := range batches {
+ if batch.err != nil {
select {
case <-ctx.Done():
- if sb.err == nil {
- releaseBlockCursors(sb.cursors)
- }
return
- case resultCh <- sb:
+ case out <- &scanBatch{traceBatch: batch, err:
batch.err}:
}
+ continue
}
- }()
- }
-}
-
-func (t *trace) dispatchTraceBatches(
- ctx context.Context,
- batches <-chan traceBatch,
- jobCh chan<- traceBatch,
- maxTraceSize int,
-) {
- defer close(jobCh)
-
- total := 0
- sequence := 0
- limitReached := false
-
- for batch := range batches {
- if limitReached {
- continue
- }
-
- if batch.err != nil {
- batch.seq = sequence
- sequence++
- if !sendTraceBatch(ctx, jobCh, batch) {
- return
- }
- limitReached = true
- continue
- }
- if len(batch.traceIDs) == 0 {
- continue
- }
-
- var reached bool
- batch, total, reached = limitTraceBatch(batch, maxTraceSize,
total)
- if len(batch.traceIDs) == 0 {
- if reached {
- limitReached = true
+ // Create the cursor channel and scanBatch
+ cursorCh := make(chan scanCursorResult)
+ sb := &scanBatch{
+ traceBatch: batch,
+ cursorCh: cursorCh,
}
- continue
- }
-
- batch.seq = sequence
- sequence++
- if !sendTraceBatch(ctx, jobCh, batch) {
- return
- }
- if reached {
- limitReached = true
- }
- }
-}
-
-func limitTraceBatch(batch traceBatch, maxTraceSize int, total int)
(traceBatch, int, bool) {
- if maxTraceSize <= 0 {
- total += len(batch.traceIDs)
- return batch, total, false
- }
- remaining := maxTraceSize - total
- if remaining <= 0 {
- batch.traceIDs = batch.traceIDs[:0]
- return batch, total, true
- }
-
- if len(batch.traceIDs) > remaining {
- batch.traceIDs = append([]string(nil),
batch.traceIDs[:remaining]...)
- }
-
- total += len(batch.traceIDs)
-
- return batch, total, total >= maxTraceSize
-}
-
-func sendTraceBatch(ctx context.Context, jobCh chan<- traceBatch, batch
traceBatch) bool {
- select {
- case <-ctx.Done():
- return false
- case jobCh <- batch:
- return true
- }
-}
-
-func (t *trace) collectScanResults(
- ctx context.Context,
- resultCh <-chan *scanBatch,
- out chan<- *scanBatch,
-) {
- pending := make(map[int]*scanBatch)
- nextSeq := 0
-
- for {
- select {
- case <-ctx.Done():
- releasePendingBatches(pending)
- return
- case sb, ok := <-resultCh:
- if !ok {
- flushPendingBatches(out, pending, nextSeq)
- return
- }
- if sb == nil {
- continue
- }
- if !t.processScanBatch(ctx, sb, pending, &nextSeq, out)
{
- releasePendingBatches(pending)
+ // Send batch downstream first so consumer can start
reading
+ select {
+ case <-ctx.Done():
+ close(cursorCh)
return
+ case out <- sb:
}
- }
- }
-}
-func (t *trace) processScanBatch(
- ctx context.Context,
- sb *scanBatch,
- pending map[int]*scanBatch,
- nextSeq *int,
- out chan<- *scanBatch,
-) bool {
- if sb.err != nil {
- releasePendingBatches(pending)
- return sendScanBatch(ctx, out, sb)
- }
-
- if sb.seq == *nextSeq {
- if !sendScanBatch(ctx, out, sb) {
- return false
+ // Now scan inline and populate the channel
+ t.scanTraceIDsInline(ctx, parts, qo, batch.traceIDs,
cursorCh)
+ close(cursorCh)
}
- (*nextSeq)++
- return t.flushReadyBatches(ctx, pending, nextSeq, out)
- }
-
- pending[sb.seq] = sb
- return true
-}
-
-func (t *trace) flushReadyBatches(
- ctx context.Context,
- pending map[int]*scanBatch,
- nextSeq *int,
- out chan<- *scanBatch,
-) bool {
- for {
- sb, exists := pending[*nextSeq]
- if !exists {
- return true
- }
- delete(pending, *nextSeq)
- if !sendScanBatch(ctx, out, sb) {
- releasePendingBatches(pending)
- return false
- }
- (*nextSeq)++
- }
-}
-
-func flushPendingBatches(out chan<- *scanBatch, pending map[int]*scanBatch,
nextSeq int) {
- for {
- sb, exists := pending[nextSeq]
- if !exists {
- break
- }
- out <- sb
- delete(pending, nextSeq)
- nextSeq++
- }
-}
+ }()
-func sendScanBatch(ctx context.Context, out chan<- *scanBatch, sb *scanBatch)
bool {
- select {
- case <-ctx.Done():
- if sb.err == nil {
- releaseBlockCursors(sb.cursors)
- }
- return false
- case out <- sb:
- return true
- }
+ return out
}
-func releasePendingBatches(pending map[int]*scanBatch) {
- for seq, sb := range pending {
- if sb != nil && sb.err == nil {
- releaseBlockCursors(sb.cursors)
- }
- delete(pending, seq)
- }
+type scanCursorResult struct {
+ cursor *blockCursor
+ err error
}
-func (t *trace) scanTraceIDs(ctx context.Context, parts []*part, qo
queryOptions, traceIDs []string) (result []*blockCursor, err error) {
+func (t *trace) scanTraceIDsInline(ctx context.Context, parts []*part, qo
queryOptions, traceIDs []string, out chan<- scanCursorResult) {
if len(parts) == 0 || len(traceIDs) == 0 {
- return nil, nil
+ return
}
Review Comment:
Resource accounting via protector.AcquireResource was removed; this makes
quota checks best-effort and can race with other scans. After computing
spanBlockBytes for the batch, acquire the resource (e.g., before sending the
first cursor) and propagate errors; otherwise we can exceed the global budget
under concurrency.
##########
banyand/trace/query.go:
##########
@@ -99,7 +100,8 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
parts := t.attachSnapshots(&result, tables, qo.minTimestamp,
qo.maxTimestamp)
- pipelineCtx, cancel := context.WithCancel(ctx)
+ // TODO: remove this once we have a proper timeout mechanism
+ pipelineCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
result.ctx = pipelineCtx
result.cancel = cancel
Review Comment:
Hardcoding a 10s timeout will cancel legitimate long-running queries and
changes the API behavior. Consider honoring the caller's ctx as-is, or make the
timeout configurable (e.g., via options) and default to no additional timeout.
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -912,76 +744,73 @@ func (t *trace) scanTraceIDs(ctx context.Context, parts
[]*part, qo queryOptions
tstIter.init(bma, parts, sortedIDs)
if initErr := tstIter.Error(); initErr != nil {
- err = fmt.Errorf("cannot init tstIter: %w", initErr)
+ spanErr = fmt.Errorf("cannot init tstIter: %w", initErr)
+ select {
+ case out <- scanCursorResult{err: spanErr}:
+ case <-ctx.Done():
+ }
return
}
- var (
- spanBlockBytes uint64
- hit int
- )
-
quota := t.pm.AvailableBytes()
+ hit := 0
for tstIter.nextBlock() {
if hit%checkDoneEvery == 0 {
select {
case <-ctx.Done():
- err = pkgerrors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
- len(result),
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+ spanErr = pkgerrors.WithMessagef(ctx.Err(),
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+ cursorCount,
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
return
default:
}
}
hit++
+ // Create block cursor and get size before checking quota
bc := generateBlockCursor()
p := tstIter.piPool[tstIter.idx]
bc.init(p.p, p.curBlock, qo)
- result = append(result, bc)
+ blockSize := bc.bm.uncompressedSpanSizeBytes
+
+ // Check if adding this block would exceed quota
+ if quota >= 0 && spanBlockBytes+blockSize > uint64(quota) {
+ releaseBlockCursor(bc)
+ if cursorCount > 0 {
+ // Have results, return them successfully by
just closing channel
+ return
+ }
+ // No results, send error
+ spanErr = fmt.Errorf("block scan quota exceeded: block
size %d bytes, quota is %d bytes", blockSize, quota)
+ select {
+ case out <- scanCursorResult{err: spanErr}:
+ case <-ctx.Done():
+ }
+ return
+ }
Review Comment:
Resource accounting via protector.AcquireResource was removed; this makes
quota checks best-effort and can race with other scans. After computing
spanBlockBytes for the batch, acquire the resource (e.g., before sending the
first cursor) and propagate errors; otherwise we can exceed the global budget
under concurrency.
##########
banyand/trace/metrics.go:
##########
@@ -292,6 +310,8 @@ func (s *supplier) newMetrics(p common.Position)
storage.Metrics {
totalSyncLoopStarted:
factory.NewCounter("total_sync_loop_started"),
totalSyncLoopFinished:
factory.NewCounter("total_sync_loop_finished"),
totalSyncLoopErr:
factory.NewCounter("total_sync_loop_err"),
+ totalSyncLoopLatency:
factory.NewCounter("total_sync_loop_latency"),
Review Comment:
Latency is tracked with a Counter, which loses distribution and percentile
information; use a Histogram or Summary metric for durations (seconds) and a
Counter for bytes. For example, expose total_sync_loop_latency_seconds as a
Histogram.
```suggestion
totalSyncLoopLatency:
factory.NewHistogram("total_sync_loop_latency", meter.DefaultLatencyBuckets...),
```
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -629,261 +668,54 @@ func (t *trace) startBlockScanStage(
parts []*part,
qo queryOptions,
batches <-chan traceBatch,
- maxTraceSize int,
) <-chan *scanBatch {
out := make(chan *scanBatch)
- workerCount := cgroups.CPUs()
- if workerCount < 1 {
- workerCount = 1
- }
-
- jobCh := make(chan traceBatch)
- resultCh := make(chan *scanBatch)
-
- var workerGroup sync.WaitGroup
- workerGroup.Add(workerCount)
-
- t.launchBlockScanWorkers(ctx, parts, qo, workerCount, jobCh, resultCh,
&workerGroup)
-
- go func() {
- workerGroup.Wait()
- close(resultCh)
- }()
-
- go func() {
- t.dispatchTraceBatches(ctx, batches, jobCh, maxTraceSize)
- }()
-
go func() {
defer close(out)
- t.collectScanResults(ctx, resultCh, out)
- }()
-
- return out
-}
-
-func (t *trace) launchBlockScanWorkers(
- ctx context.Context,
- parts []*part,
- qo queryOptions,
- workerCount int,
- jobCh <-chan traceBatch,
- resultCh chan<- *scanBatch,
- wg *sync.WaitGroup,
-) {
- for i := 0; i < workerCount; i++ {
- go func() {
- defer wg.Done()
- for batch := range jobCh {
- sb := t.processTraceBatch(ctx, parts, qo, batch)
- if sb == nil {
- continue
- }
+ for batch := range batches {
+ if batch.err != nil {
select {
case <-ctx.Done():
- if sb.err == nil {
- releaseBlockCursors(sb.cursors)
- }
return
- case resultCh <- sb:
+ case out <- &scanBatch{traceBatch: batch, err:
batch.err}:
}
+ continue
}
- }()
- }
-}
-
-func (t *trace) dispatchTraceBatches(
- ctx context.Context,
- batches <-chan traceBatch,
- jobCh chan<- traceBatch,
- maxTraceSize int,
-) {
- defer close(jobCh)
-
- total := 0
- sequence := 0
- limitReached := false
-
- for batch := range batches {
- if limitReached {
- continue
- }
-
- if batch.err != nil {
- batch.seq = sequence
- sequence++
- if !sendTraceBatch(ctx, jobCh, batch) {
- return
- }
- limitReached = true
- continue
- }
- if len(batch.traceIDs) == 0 {
- continue
- }
-
- var reached bool
- batch, total, reached = limitTraceBatch(batch, maxTraceSize,
total)
- if len(batch.traceIDs) == 0 {
- if reached {
- limitReached = true
+ // Create the cursor channel and scanBatch
+ cursorCh := make(chan scanCursorResult)
+ sb := &scanBatch{
+ traceBatch: batch,
+ cursorCh: cursorCh,
}
- continue
- }
-
- batch.seq = sequence
- sequence++
- if !sendTraceBatch(ctx, jobCh, batch) {
- return
- }
- if reached {
- limitReached = true
- }
- }
-}
-
-func limitTraceBatch(batch traceBatch, maxTraceSize int, total int)
(traceBatch, int, bool) {
- if maxTraceSize <= 0 {
- total += len(batch.traceIDs)
- return batch, total, false
- }
- remaining := maxTraceSize - total
- if remaining <= 0 {
- batch.traceIDs = batch.traceIDs[:0]
- return batch, total, true
- }
-
- if len(batch.traceIDs) > remaining {
- batch.traceIDs = append([]string(nil),
batch.traceIDs[:remaining]...)
- }
-
- total += len(batch.traceIDs)
-
- return batch, total, total >= maxTraceSize
-}
-
-func sendTraceBatch(ctx context.Context, jobCh chan<- traceBatch, batch
traceBatch) bool {
- select {
- case <-ctx.Done():
- return false
- case jobCh <- batch:
- return true
- }
-}
-
-func (t *trace) collectScanResults(
- ctx context.Context,
- resultCh <-chan *scanBatch,
- out chan<- *scanBatch,
-) {
- pending := make(map[int]*scanBatch)
- nextSeq := 0
-
- for {
- select {
- case <-ctx.Done():
- releasePendingBatches(pending)
- return
- case sb, ok := <-resultCh:
- if !ok {
- flushPendingBatches(out, pending, nextSeq)
- return
- }
- if sb == nil {
- continue
- }
- if !t.processScanBatch(ctx, sb, pending, &nextSeq, out)
{
- releasePendingBatches(pending)
+ // Send batch downstream first so consumer can start
reading
+ select {
+ case <-ctx.Done():
+ close(cursorCh)
return
+ case out <- sb:
}
- }
- }
-}
-func (t *trace) processScanBatch(
- ctx context.Context,
- sb *scanBatch,
- pending map[int]*scanBatch,
- nextSeq *int,
- out chan<- *scanBatch,
-) bool {
- if sb.err != nil {
- releasePendingBatches(pending)
- return sendScanBatch(ctx, out, sb)
- }
-
- if sb.seq == *nextSeq {
- if !sendScanBatch(ctx, out, sb) {
- return false
+ // Now scan inline and populate the channel
+ t.scanTraceIDsInline(ctx, parts, qo, batch.traceIDs,
cursorCh)
+ close(cursorCh)
Review Comment:
Scanning inline blocks the consumption of subsequent trace batches from
'batches', serializing the pipeline and reducing throughput. Move
scanTraceIDsInline into its own goroutine per batch so the stage continues
consuming 'batches' and upstream producers aren't back-pressured by downstream
scanning.
```suggestion
// Now scan in a separate goroutine to avoid blocking
batch consumption
go func(batch *traceBatch, cursorCh chan
scanCursorResult) {
t.scanTraceIDsInline(ctx, parts, qo,
batch.traceIDs, cursorCh)
close(cursorCh)
}(batch, cursorCh)
```
##########
banyand/trace/streaming_pipeline.go:
##########
@@ -383,48 +395,46 @@ func (r *sidxStreamRunner) prepare(instances []sidx.SIDX)
error {
})
}
+ // Add shards with data to the heap
for _, src := range sources {
heap.Push(r.heap, src.shard)
}
- if len(sources) == 0 {
- return nil
- }
-
- r.errEvents = make(chan sidxStreamError, len(sources))
+ // Create error event channel and start error forwarding goroutines
+ // for ALL error channels, even from shards without data
+ if len(allErrChannels) > 0 {
+ r.errEvents = make(chan sidxStreamError, len(allErrChannels))
- for _, src := range sources {
- if src.errCh == nil {
- continue
+ for _, errSrc := range allErrChannels {
+ r.errWg.Add(1)
+ go func(index int, ch <-chan error) {
+ defer r.errWg.Done()
+ forwardSIDXError(r.streamCtx, index, ch,
r.errEvents)
+ }(errSrc.idx, errSrc.errCh)
}
- r.errWg.Add(1)
- go func(index int, ch <-chan error) {
- defer r.errWg.Done()
- forwardSIDXError(r.streamCtx, index, ch, r.errEvents)
- }(src.idx, src.errCh)
- }
- go func() {
- r.errWg.Wait()
- close(r.errEvents)
- }()
+ go func() {
+ r.errWg.Wait()
+ close(r.errEvents)
+ }()
+ }
return nil
}
func (r *sidxStreamRunner) run(out chan<- traceBatch) {
- if r.heap.Len() == 0 {
+ // Always drain error events before returning, even on early exit
+ // Cancel first to stop SIDX goroutines, then drain any pending errors
+ defer func() {
r.cancel()
r.drainErrorEvents(out)
+ }()
+
+ if r.heap.Len() == 0 {
return
}
for r.heap.Len() > 0 {
Review Comment:
The overall maxTraceSize cap is no longer enforced (previous early breaks
are removed), changing behavior from 'limit total trace IDs' to 'limit only
per-batch via MaxBatchSize'. If this is unintended, reintroduce a check like
'if r.maxTraceSize > 0 && r.total >= r.maxTraceSize { return }'; if it is
intended, update tracing/tags and docs to reflect the removal of a global cap.
```suggestion
for r.heap.Len() > 0 {
// Enforce global maxTraceSize cap
if r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
return
}
```
##########
banyand/trace/metrics.go:
##########
@@ -163,6 +165,20 @@ func (tst *tsTable) incTotalSyncLoopErr(delta int) {
tst.metrics.totalSyncLoopErr.Inc(float64(delta))
}
+func (tst *tsTable) incTotalSyncLoopLatency(delta float64) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalSyncLoopLatency.Inc(delta)
+}
Review Comment:
Latency is tracked with a Counter, which loses distribution and percentile
information; use a Histogram or Summary metric for durations (seconds) and a
Counter for bytes. For example, expose total_sync_loop_latency_seconds as a
Histogram.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]