Copilot commented on code in PR #815:
URL: 
https://github.com/apache/skywalking-banyandb/pull/815#discussion_r2443574184


##########
banyand/internal/sidx/block_scanner.go:
##########
@@ -127,39 +128,44 @@ func (bsn *blockScanner) scan(ctx context.Context, 
blockCh chan *blockScanResult
                }
 
                p := it.piHeap[0]
-               batch.bss = append(batch.bss, blockScanResult{
-                       p: p.p,
-               })
-               bs := &batch.bss[len(batch.bss)-1]
-               bs.bm.copyFrom(p.curBlock)
 
+               // Get block size before adding to batch
+               blockSize := p.curBlock.uncompressedSize
+
+               // Check if adding this block would exceed quota
                quota := bsn.pm.AvailableBytes()
-               for i := range batch.bss {
-                       totalBlockBytes += batch.bss[i].bm.uncompressedSize
-                       if quota >= 0 && totalBlockBytes > uint64(quota) {
-                               err := fmt.Errorf("block scan quota exceeded: 
used %d bytes, quota is %d bytes", totalBlockBytes, quota)
-                               batch.err = err
+               if quota >= 0 && totalBlockBytes+blockSize > uint64(quota) {
+                       if len(batch.bss) > 0 {
+                               // Send current batch without error
                                select {
                                case blockCh <- batch:
                                case <-ctx.Done():
                                        releaseBlockScanResultBatch(batch)
-                                       bsn.l.Warn().Err(err).Msg("quota 
exceeded, context canceled")
                                }
                                return
                        }
+                       // Batch is empty, send error
+                       err := fmt.Errorf("sidx block scan quota exceeded: 
block size %s, quota is %s", humanize.Bytes(blockSize), 
humanize.Bytes(uint64(quota)))
+                       batch.err = err
+                       select {
+                       case blockCh <- batch:
+                       case <-ctx.Done():
+                               releaseBlockScanResultBatch(batch)
+                               return
+                       }
+                       return
                }
 
+               // Quota OK, add block to batch
+               batch.bss = append(batch.bss, blockScanResult{
+                       p: p.p,
+               })
+               bs := &batch.bss[len(batch.bss)-1]
+               bs.bm.copyFrom(p.curBlock)
+               totalBlockBytes += blockSize
+
+               // Check if batch is full
                if len(batch.bss) >= cap(batch.bss) {
-                       if err := bsn.pm.AcquireResource(ctx, totalBlockBytes); 
err != nil {
-                               batch.err = fmt.Errorf("cannot acquire 
resource: %w", err)
-                               select {
-                               case blockCh <- batch:
-                               case <-ctx.Done():
-                                       releaseBlockScanResultBatch(batch)
-                                       bsn.l.Warn().Err(err).Msg("cannot 
acquire resource")
-                               }
-                               return
-                       }
                        select {
                        case blockCh <- batch:
                        case <-ctx.Done():

Review Comment:
   Resource reservation with protector is no longer performed; AvailableBytes 
check alone doesn't acquire/hold memory budget and can lead to concurrent 
oversubscription. Before emitting a full batch, call AcquireResource(ctx, 
totalBlockBytes) and propagate any error; ensure corresponding release happens 
when the batch is fully consumed.



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -912,76 +744,73 @@ func (t *trace) scanTraceIDs(ctx context.Context, parts 
[]*part, qo queryOptions
 
        tstIter.init(bma, parts, sortedIDs)
        if initErr := tstIter.Error(); initErr != nil {
-               err = fmt.Errorf("cannot init tstIter: %w", initErr)
+               spanErr = fmt.Errorf("cannot init tstIter: %w", initErr)
+               select {
+               case out <- scanCursorResult{err: spanErr}:
+               case <-ctx.Done():
+               }
                return
        }
 
-       var (
-               spanBlockBytes uint64
-               hit            int
-       )
-
        quota := t.pm.AvailableBytes()
+       hit := 0
 
        for tstIter.nextBlock() {
                if hit%checkDoneEvery == 0 {
                        select {
                        case <-ctx.Done():
-                               err = pkgerrors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
-                                       len(result), 
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+                               spanErr = pkgerrors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+                                       cursorCount, 
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
                                return
                        default:
                        }
                }
                hit++
 
+               // Create block cursor and get size before checking quota
                bc := generateBlockCursor()
                p := tstIter.piPool[tstIter.idx]
                bc.init(p.p, p.curBlock, qo)
-               result = append(result, bc)
+               blockSize := bc.bm.uncompressedSpanSizeBytes
+
+               // Check if adding this block would exceed quota
+               if quota >= 0 && spanBlockBytes+blockSize > uint64(quota) {
+                       releaseBlockCursor(bc)
+                       if cursorCount > 0 {
+                               // Have results, return them successfully by 
just closing channel
+                               return
+                       }
+                       // No results, send error
+                       spanErr = fmt.Errorf("block scan quota exceeded: block 
size %d bytes, quota is %d bytes", blockSize, quota)
+                       select {
+                       case out <- scanCursorResult{err: spanErr}:
+                       case <-ctx.Done():
+                       }
+                       return
+               }
+
+               // Quota OK, send cursor
                if recordBlock != nil {
                        recordBlock(bc)
                }
-               spanBlockBytes += bc.bm.uncompressedSpanSizeBytes
+               spanBlockBytes += blockSize
+               cursorCount++
 
-               if quota >= 0 && spanBlockBytes > uint64(quota) {
-                       err = fmt.Errorf("block scan quota exceeded: used %d 
bytes, quota is %d bytes", spanBlockBytes, quota)
+               select {
+               case out <- scanCursorResult{cursor: bc}:
+               case <-ctx.Done():
+                       releaseBlockCursor(bc)
+                       spanErr = pkgerrors.WithMessagef(ctx.Err(), "interrupt: 
scanned %d blocks", cursorCount)
                        return
                }

Review Comment:
   Resource accounting via protector.AcquireResource was removed; this makes 
quota checks best-effort and can race with other scans. After computing 
spanBlockBytes for the batch, acquire the resource (e.g., before sending the 
first cursor) and propagate errors; otherwise we can exceed the global budget 
under concurrency.



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -457,11 +467,6 @@ func (r *sidxStreamRunner) run(out chan<- traceBatch) {
                        }
                }
 
-               if added && r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
-                       r.limitReached = true
-                       break
-               }
-
                if err := r.advanceShard(shard); err != nil {

Review Comment:
   The overall maxTraceSize cap is no longer enforced (previous early breaks 
are removed), changing behavior from 'limit total trace IDs' to 'limit only 
per-batch via MaxBatchSize'. If this is unintended, reintroduce a check like 
'if r.maxTraceSize > 0 && r.total >= r.maxTraceSize { return }'; if it is 
intended, update tracing/tags and docs to reflect the removal of a global cap.



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -629,261 +668,54 @@ func (t *trace) startBlockScanStage(
        parts []*part,
        qo queryOptions,
        batches <-chan traceBatch,
-       maxTraceSize int,
 ) <-chan *scanBatch {
        out := make(chan *scanBatch)
 
-       workerCount := cgroups.CPUs()
-       if workerCount < 1 {
-               workerCount = 1
-       }
-
-       jobCh := make(chan traceBatch)
-       resultCh := make(chan *scanBatch)
-
-       var workerGroup sync.WaitGroup
-       workerGroup.Add(workerCount)
-
-       t.launchBlockScanWorkers(ctx, parts, qo, workerCount, jobCh, resultCh, 
&workerGroup)
-
-       go func() {
-               workerGroup.Wait()
-               close(resultCh)
-       }()
-
-       go func() {
-               t.dispatchTraceBatches(ctx, batches, jobCh, maxTraceSize)
-       }()
-
        go func() {
                defer close(out)
-               t.collectScanResults(ctx, resultCh, out)
-       }()
-
-       return out
-}
-
-func (t *trace) launchBlockScanWorkers(
-       ctx context.Context,
-       parts []*part,
-       qo queryOptions,
-       workerCount int,
-       jobCh <-chan traceBatch,
-       resultCh chan<- *scanBatch,
-       wg *sync.WaitGroup,
-) {
-       for i := 0; i < workerCount; i++ {
-               go func() {
-                       defer wg.Done()
-                       for batch := range jobCh {
-                               sb := t.processTraceBatch(ctx, parts, qo, batch)
-                               if sb == nil {
-                                       continue
-                               }
 
+               for batch := range batches {
+                       if batch.err != nil {
                                select {
                                case <-ctx.Done():
-                                       if sb.err == nil {
-                                               releaseBlockCursors(sb.cursors)
-                                       }
                                        return
-                               case resultCh <- sb:
+                               case out <- &scanBatch{traceBatch: batch, err: 
batch.err}:
                                }
+                               continue
                        }
-               }()
-       }
-}
-
-func (t *trace) dispatchTraceBatches(
-       ctx context.Context,
-       batches <-chan traceBatch,
-       jobCh chan<- traceBatch,
-       maxTraceSize int,
-) {
-       defer close(jobCh)
-
-       total := 0
-       sequence := 0
-       limitReached := false
-
-       for batch := range batches {
-               if limitReached {
-                       continue
-               }
-
-               if batch.err != nil {
-                       batch.seq = sequence
-                       sequence++
-                       if !sendTraceBatch(ctx, jobCh, batch) {
-                               return
-                       }
-                       limitReached = true
-                       continue
-               }
 
-               if len(batch.traceIDs) == 0 {
-                       continue
-               }
-
-               var reached bool
-               batch, total, reached = limitTraceBatch(batch, maxTraceSize, 
total)
-               if len(batch.traceIDs) == 0 {
-                       if reached {
-                               limitReached = true
+                       // Create the cursor channel and scanBatch
+                       cursorCh := make(chan scanCursorResult)
+                       sb := &scanBatch{
+                               traceBatch: batch,
+                               cursorCh:   cursorCh,
                        }
-                       continue
-               }
-
-               batch.seq = sequence
-               sequence++
-               if !sendTraceBatch(ctx, jobCh, batch) {
-                       return
-               }
-               if reached {
-                       limitReached = true
-               }
-       }
-}
-
-func limitTraceBatch(batch traceBatch, maxTraceSize int, total int) 
(traceBatch, int, bool) {
-       if maxTraceSize <= 0 {
-               total += len(batch.traceIDs)
-               return batch, total, false
-       }
 
-       remaining := maxTraceSize - total
-       if remaining <= 0 {
-               batch.traceIDs = batch.traceIDs[:0]
-               return batch, total, true
-       }
-
-       if len(batch.traceIDs) > remaining {
-               batch.traceIDs = append([]string(nil), 
batch.traceIDs[:remaining]...)
-       }
-
-       total += len(batch.traceIDs)
-
-       return batch, total, total >= maxTraceSize
-}
-
-func sendTraceBatch(ctx context.Context, jobCh chan<- traceBatch, batch 
traceBatch) bool {
-       select {
-       case <-ctx.Done():
-               return false
-       case jobCh <- batch:
-               return true
-       }
-}
-
-func (t *trace) collectScanResults(
-       ctx context.Context,
-       resultCh <-chan *scanBatch,
-       out chan<- *scanBatch,
-) {
-       pending := make(map[int]*scanBatch)
-       nextSeq := 0
-
-       for {
-               select {
-               case <-ctx.Done():
-                       releasePendingBatches(pending)
-                       return
-               case sb, ok := <-resultCh:
-                       if !ok {
-                               flushPendingBatches(out, pending, nextSeq)
-                               return
-                       }
-                       if sb == nil {
-                               continue
-                       }
-                       if !t.processScanBatch(ctx, sb, pending, &nextSeq, out) 
{
-                               releasePendingBatches(pending)
+                       // Send batch downstream first so consumer can start 
reading
+                       select {
+                       case <-ctx.Done():
+                               close(cursorCh)
                                return
+                       case out <- sb:
                        }
-               }
-       }
-}
 
-func (t *trace) processScanBatch(
-       ctx context.Context,
-       sb *scanBatch,
-       pending map[int]*scanBatch,
-       nextSeq *int,
-       out chan<- *scanBatch,
-) bool {
-       if sb.err != nil {
-               releasePendingBatches(pending)
-               return sendScanBatch(ctx, out, sb)
-       }
-
-       if sb.seq == *nextSeq {
-               if !sendScanBatch(ctx, out, sb) {
-                       return false
+                       // Now scan inline and populate the channel
+                       t.scanTraceIDsInline(ctx, parts, qo, batch.traceIDs, 
cursorCh)
+                       close(cursorCh)
                }
-               (*nextSeq)++
-               return t.flushReadyBatches(ctx, pending, nextSeq, out)
-       }
-
-       pending[sb.seq] = sb
-       return true
-}
-
-func (t *trace) flushReadyBatches(
-       ctx context.Context,
-       pending map[int]*scanBatch,
-       nextSeq *int,
-       out chan<- *scanBatch,
-) bool {
-       for {
-               sb, exists := pending[*nextSeq]
-               if !exists {
-                       return true
-               }
-               delete(pending, *nextSeq)
-               if !sendScanBatch(ctx, out, sb) {
-                       releasePendingBatches(pending)
-                       return false
-               }
-               (*nextSeq)++
-       }
-}
-
-func flushPendingBatches(out chan<- *scanBatch, pending map[int]*scanBatch, 
nextSeq int) {
-       for {
-               sb, exists := pending[nextSeq]
-               if !exists {
-                       break
-               }
-               out <- sb
-               delete(pending, nextSeq)
-               nextSeq++
-       }
-}
+       }()
 
-func sendScanBatch(ctx context.Context, out chan<- *scanBatch, sb *scanBatch) 
bool {
-       select {
-       case <-ctx.Done():
-               if sb.err == nil {
-                       releaseBlockCursors(sb.cursors)
-               }
-               return false
-       case out <- sb:
-               return true
-       }
+       return out
 }
 
-func releasePendingBatches(pending map[int]*scanBatch) {
-       for seq, sb := range pending {
-               if sb != nil && sb.err == nil {
-                       releaseBlockCursors(sb.cursors)
-               }
-               delete(pending, seq)
-       }
+type scanCursorResult struct {
+       cursor *blockCursor
+       err    error
 }
 
-func (t *trace) scanTraceIDs(ctx context.Context, parts []*part, qo 
queryOptions, traceIDs []string) (result []*blockCursor, err error) {
+func (t *trace) scanTraceIDsInline(ctx context.Context, parts []*part, qo 
queryOptions, traceIDs []string, out chan<- scanCursorResult) {
        if len(parts) == 0 || len(traceIDs) == 0 {
-               return nil, nil
+               return
        }

Review Comment:
   Resource accounting via protector.AcquireResource was removed; this makes 
quota checks best-effort and can race with other scans. After computing 
spanBlockBytes for the batch, acquire the resource (e.g., before sending the 
first cursor) and propagate errors; otherwise we can exceed the global budget 
under concurrency.



##########
banyand/trace/query.go:
##########
@@ -99,7 +100,8 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
 
        parts := t.attachSnapshots(&result, tables, qo.minTimestamp, 
qo.maxTimestamp)
 
-       pipelineCtx, cancel := context.WithCancel(ctx)
+       // TODO: remove this once we have a proper timeout mechanism
+       pipelineCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
        result.ctx = pipelineCtx
        result.cancel = cancel

Review Comment:
   Hardcoding a 10s timeout will cancel legitimate long-running queries and 
changes the API behavior. Consider honoring the caller's ctx as-is, or make the 
timeout configurable (e.g., via options) and default to no additional timeout.



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -912,76 +744,73 @@ func (t *trace) scanTraceIDs(ctx context.Context, parts 
[]*part, qo queryOptions
 
        tstIter.init(bma, parts, sortedIDs)
        if initErr := tstIter.Error(); initErr != nil {
-               err = fmt.Errorf("cannot init tstIter: %w", initErr)
+               spanErr = fmt.Errorf("cannot init tstIter: %w", initErr)
+               select {
+               case out <- scanCursorResult{err: spanErr}:
+               case <-ctx.Done():
+               }
                return
        }
 
-       var (
-               spanBlockBytes uint64
-               hit            int
-       )
-
        quota := t.pm.AvailableBytes()
+       hit := 0
 
        for tstIter.nextBlock() {
                if hit%checkDoneEvery == 0 {
                        select {
                        case <-ctx.Done():
-                               err = pkgerrors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
-                                       len(result), 
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
+                               spanErr = pkgerrors.WithMessagef(ctx.Err(), 
"interrupt: scanned %d blocks, remained %d/%d parts to scan",
+                                       cursorCount, 
len(tstIter.piPool)-tstIter.idx, len(tstIter.piPool))
                                return
                        default:
                        }
                }
                hit++
 
+               // Create block cursor and get size before checking quota
                bc := generateBlockCursor()
                p := tstIter.piPool[tstIter.idx]
                bc.init(p.p, p.curBlock, qo)
-               result = append(result, bc)
+               blockSize := bc.bm.uncompressedSpanSizeBytes
+
+               // Check if adding this block would exceed quota
+               if quota >= 0 && spanBlockBytes+blockSize > uint64(quota) {
+                       releaseBlockCursor(bc)
+                       if cursorCount > 0 {
+                               // Have results, return them successfully by 
just closing channel
+                               return
+                       }
+                       // No results, send error
+                       spanErr = fmt.Errorf("block scan quota exceeded: block 
size %d bytes, quota is %d bytes", blockSize, quota)
+                       select {
+                       case out <- scanCursorResult{err: spanErr}:
+                       case <-ctx.Done():
+                       }
+                       return
+               }

Review Comment:
   Resource accounting via protector.AcquireResource was removed; this makes 
quota checks best-effort and can race with other scans. After computing 
spanBlockBytes for the batch, acquire the resource (e.g., before sending the 
first cursor) and propagate errors; otherwise we can exceed the global budget 
under concurrency.



##########
banyand/trace/metrics.go:
##########
@@ -292,6 +310,8 @@ func (s *supplier) newMetrics(p common.Position) 
storage.Metrics {
                totalSyncLoopStarted:       
factory.NewCounter("total_sync_loop_started"),
                totalSyncLoopFinished:      
factory.NewCounter("total_sync_loop_finished"),
                totalSyncLoopErr:           
factory.NewCounter("total_sync_loop_err"),
+               totalSyncLoopLatency:       
factory.NewCounter("total_sync_loop_latency"),

Review Comment:
   Latency is tracked with a Counter, which loses distribution and percentile 
information; use a Histogram or Summary metric for durations (seconds) and a 
Counter for bytes. For example, expose total_sync_loop_latency_seconds as a 
Histogram.
   ```suggestion
                totalSyncLoopLatency:       
factory.NewHistogram("total_sync_loop_latency", meter.DefaultLatencyBuckets...),
   ```



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -629,261 +668,54 @@ func (t *trace) startBlockScanStage(
        parts []*part,
        qo queryOptions,
        batches <-chan traceBatch,
-       maxTraceSize int,
 ) <-chan *scanBatch {
        out := make(chan *scanBatch)
 
-       workerCount := cgroups.CPUs()
-       if workerCount < 1 {
-               workerCount = 1
-       }
-
-       jobCh := make(chan traceBatch)
-       resultCh := make(chan *scanBatch)
-
-       var workerGroup sync.WaitGroup
-       workerGroup.Add(workerCount)
-
-       t.launchBlockScanWorkers(ctx, parts, qo, workerCount, jobCh, resultCh, 
&workerGroup)
-
-       go func() {
-               workerGroup.Wait()
-               close(resultCh)
-       }()
-
-       go func() {
-               t.dispatchTraceBatches(ctx, batches, jobCh, maxTraceSize)
-       }()
-
        go func() {
                defer close(out)
-               t.collectScanResults(ctx, resultCh, out)
-       }()
-
-       return out
-}
-
-func (t *trace) launchBlockScanWorkers(
-       ctx context.Context,
-       parts []*part,
-       qo queryOptions,
-       workerCount int,
-       jobCh <-chan traceBatch,
-       resultCh chan<- *scanBatch,
-       wg *sync.WaitGroup,
-) {
-       for i := 0; i < workerCount; i++ {
-               go func() {
-                       defer wg.Done()
-                       for batch := range jobCh {
-                               sb := t.processTraceBatch(ctx, parts, qo, batch)
-                               if sb == nil {
-                                       continue
-                               }
 
+               for batch := range batches {
+                       if batch.err != nil {
                                select {
                                case <-ctx.Done():
-                                       if sb.err == nil {
-                                               releaseBlockCursors(sb.cursors)
-                                       }
                                        return
-                               case resultCh <- sb:
+                               case out <- &scanBatch{traceBatch: batch, err: 
batch.err}:
                                }
+                               continue
                        }
-               }()
-       }
-}
-
-func (t *trace) dispatchTraceBatches(
-       ctx context.Context,
-       batches <-chan traceBatch,
-       jobCh chan<- traceBatch,
-       maxTraceSize int,
-) {
-       defer close(jobCh)
-
-       total := 0
-       sequence := 0
-       limitReached := false
-
-       for batch := range batches {
-               if limitReached {
-                       continue
-               }
-
-               if batch.err != nil {
-                       batch.seq = sequence
-                       sequence++
-                       if !sendTraceBatch(ctx, jobCh, batch) {
-                               return
-                       }
-                       limitReached = true
-                       continue
-               }
 
-               if len(batch.traceIDs) == 0 {
-                       continue
-               }
-
-               var reached bool
-               batch, total, reached = limitTraceBatch(batch, maxTraceSize, 
total)
-               if len(batch.traceIDs) == 0 {
-                       if reached {
-                               limitReached = true
+                       // Create the cursor channel and scanBatch
+                       cursorCh := make(chan scanCursorResult)
+                       sb := &scanBatch{
+                               traceBatch: batch,
+                               cursorCh:   cursorCh,
                        }
-                       continue
-               }
-
-               batch.seq = sequence
-               sequence++
-               if !sendTraceBatch(ctx, jobCh, batch) {
-                       return
-               }
-               if reached {
-                       limitReached = true
-               }
-       }
-}
-
-func limitTraceBatch(batch traceBatch, maxTraceSize int, total int) 
(traceBatch, int, bool) {
-       if maxTraceSize <= 0 {
-               total += len(batch.traceIDs)
-               return batch, total, false
-       }
 
-       remaining := maxTraceSize - total
-       if remaining <= 0 {
-               batch.traceIDs = batch.traceIDs[:0]
-               return batch, total, true
-       }
-
-       if len(batch.traceIDs) > remaining {
-               batch.traceIDs = append([]string(nil), 
batch.traceIDs[:remaining]...)
-       }
-
-       total += len(batch.traceIDs)
-
-       return batch, total, total >= maxTraceSize
-}
-
-func sendTraceBatch(ctx context.Context, jobCh chan<- traceBatch, batch 
traceBatch) bool {
-       select {
-       case <-ctx.Done():
-               return false
-       case jobCh <- batch:
-               return true
-       }
-}
-
-func (t *trace) collectScanResults(
-       ctx context.Context,
-       resultCh <-chan *scanBatch,
-       out chan<- *scanBatch,
-) {
-       pending := make(map[int]*scanBatch)
-       nextSeq := 0
-
-       for {
-               select {
-               case <-ctx.Done():
-                       releasePendingBatches(pending)
-                       return
-               case sb, ok := <-resultCh:
-                       if !ok {
-                               flushPendingBatches(out, pending, nextSeq)
-                               return
-                       }
-                       if sb == nil {
-                               continue
-                       }
-                       if !t.processScanBatch(ctx, sb, pending, &nextSeq, out) 
{
-                               releasePendingBatches(pending)
+                       // Send batch downstream first so consumer can start 
reading
+                       select {
+                       case <-ctx.Done():
+                               close(cursorCh)
                                return
+                       case out <- sb:
                        }
-               }
-       }
-}
 
-func (t *trace) processScanBatch(
-       ctx context.Context,
-       sb *scanBatch,
-       pending map[int]*scanBatch,
-       nextSeq *int,
-       out chan<- *scanBatch,
-) bool {
-       if sb.err != nil {
-               releasePendingBatches(pending)
-               return sendScanBatch(ctx, out, sb)
-       }
-
-       if sb.seq == *nextSeq {
-               if !sendScanBatch(ctx, out, sb) {
-                       return false
+                       // Now scan inline and populate the channel
+                       t.scanTraceIDsInline(ctx, parts, qo, batch.traceIDs, 
cursorCh)
+                       close(cursorCh)

Review Comment:
   Scanning inline blocks the consumption of subsequent trace batches from 
'batches', serializing the pipeline and reducing throughput. Move 
scanTraceIDsInline into its own goroutine per batch so the stage continues 
consuming 'batches' and upstream producers aren't back-pressured by downstream 
scanning.
   ```suggestion
                        // Now scan in a separate goroutine to avoid blocking 
batch consumption
                        go func(batch *traceBatch, cursorCh chan 
scanCursorResult) {
                                t.scanTraceIDsInline(ctx, parts, qo, 
batch.traceIDs, cursorCh)
                                close(cursorCh)
                        }(batch, cursorCh)
   ```



##########
banyand/trace/streaming_pipeline.go:
##########
@@ -383,48 +395,46 @@ func (r *sidxStreamRunner) prepare(instances []sidx.SIDX) 
error {
                })
        }
 
+       // Add shards with data to the heap
        for _, src := range sources {
                heap.Push(r.heap, src.shard)
        }
 
-       if len(sources) == 0 {
-               return nil
-       }
-
-       r.errEvents = make(chan sidxStreamError, len(sources))
+       // Create error event channel and start error forwarding goroutines
+       // for ALL error channels, even from shards without data
+       if len(allErrChannels) > 0 {
+               r.errEvents = make(chan sidxStreamError, len(allErrChannels))
 
-       for _, src := range sources {
-               if src.errCh == nil {
-                       continue
+               for _, errSrc := range allErrChannels {
+                       r.errWg.Add(1)
+                       go func(index int, ch <-chan error) {
+                               defer r.errWg.Done()
+                               forwardSIDXError(r.streamCtx, index, ch, 
r.errEvents)
+                       }(errSrc.idx, errSrc.errCh)
                }
-               r.errWg.Add(1)
-               go func(index int, ch <-chan error) {
-                       defer r.errWg.Done()
-                       forwardSIDXError(r.streamCtx, index, ch, r.errEvents)
-               }(src.idx, src.errCh)
-       }
 
-       go func() {
-               r.errWg.Wait()
-               close(r.errEvents)
-       }()
+               go func() {
+                       r.errWg.Wait()
+                       close(r.errEvents)
+               }()
+       }
 
        return nil
 }
 
 func (r *sidxStreamRunner) run(out chan<- traceBatch) {
-       if r.heap.Len() == 0 {
+       // Always drain error events before returning, even on early exit
+       // Cancel first to stop SIDX goroutines, then drain any pending errors
+       defer func() {
                r.cancel()
                r.drainErrorEvents(out)
+       }()
+
+       if r.heap.Len() == 0 {
                return
        }
 
        for r.heap.Len() > 0 {

Review Comment:
   The overall maxTraceSize cap is no longer enforced (previous early breaks 
are removed), changing behavior from 'limit total trace IDs' to 'limit only 
per-batch via MaxBatchSize'. If this is unintended, reintroduce a check like 
'if r.maxTraceSize > 0 && r.total >= r.maxTraceSize { return }'; if it is 
intended, update tracing/tags and docs to reflect the removal of a global cap.
   ```suggestion
        for r.heap.Len() > 0 {
                // Enforce global maxTraceSize cap
                if r.maxTraceSize > 0 && r.total >= r.maxTraceSize {
                        return
                }
   ```



##########
banyand/trace/metrics.go:
##########
@@ -163,6 +165,20 @@ func (tst *tsTable) incTotalSyncLoopErr(delta int) {
        tst.metrics.totalSyncLoopErr.Inc(float64(delta))
 }
 
+func (tst *tsTable) incTotalSyncLoopLatency(delta float64) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalSyncLoopLatency.Inc(delta)
+}

Review Comment:
   Latency is tracked with a Counter, which loses distribution and percentile 
information; use a Histogram or Summary metric for durations (seconds) and a 
Counter for bytes. For example, expose total_sync_loop_latency_seconds as a 
Histogram.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to