Copilot commented on code in PR #1168:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1168#discussion_r3386149501


##########
banyand/internal/storage/segment.go:
##########
@@ -733,6 +727,28 @@ func parseSegmentTime(value string, unit IntervalUnit) 
(time.Time, error) {
        panic("invalid interval unit")
 }
 
+// ParseSegmentTime parses a segment-directory suffix back into its start time
+// for the given interval unit, the inverse of the suffix formatting used when 
a
+// segment directory is named. Callers outside this package use it to map the
+// suffixes returned by VisitSegmentsInTimeRange to the segment instants a
+// visitor observed.
+func ParseSegmentTime(suffix string, rule IntervalRule) (time.Time, error) {
+       return parseSegmentTime(suffix, rule.Unit)
+}
+
+// FormatSegmentTime formats a segment start time into its directory suffix for
+// the given interval rule, the inverse of ParseSegmentTime. The returned 
suffix
+// excludes the "seg-" directory prefix.
+func FormatSegmentTime(t time.Time, rule IntervalRule) string {
+       switch rule.Unit {
+       case HOUR:
+               return t.Format(hourFormat)
+       case DAY:
+               return t.Format(dayFormat)
+       }
+       panic("invalid interval unit")
+}

Review Comment:
   `ParseSegmentTime` parses suffixes in `time.Local`, but `FormatSegmentTime` 
formats using whatever location is carried by `t`. If callers pass a UTC (or 
otherwise non-local) time, the formatted suffix may not round-trip through 
`ParseSegmentTime`, breaking the documented “inverse” relationship and making 
suffixes timezone-dependent.
   
   Normalizing `t` to `time.Local` before formatting keeps formatting/parsing 
consistent.



##########
banyand/backup/lifecycle/migration_error_test.go:
##########
@@ -0,0 +1,296 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "encoding/json"
+       "fmt"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// TestFormatIntervalRule pins the compact interval rendering for the report's
+// interval field across day and hour granularities.
+func TestFormatIntervalRule(t *testing.T) {
+       assert.Equal(t, "5d", formatIntervalRule(storage.IntervalRule{Unit: 
storage.DAY, Num: 5}))
+       assert.Equal(t, "1d", formatIntervalRule(storage.IntervalRule{Unit: 
storage.DAY, Num: 1}))
+       assert.Equal(t, "1h", formatIntervalRule(storage.IntervalRule{Unit: 
storage.HOUR, Num: 1}))
+       assert.Equal(t, "12h", formatIntervalRule(storage.IntervalRule{Unit: 
storage.HOUR, Num: 12}))
+}
+
+// TestSegmentErrorLocation pins the source segment directory name + interval
+// derivation, the inverse of storage.ParseSegmentTime, for day and hour units.
+func TestSegmentErrorLocation(t *testing.T) {
+       start := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC)
+       dayTR := &timestamp.TimeRange{Start: start, End: start.Add(5 * 24 * 
time.Hour)}
+       seg, interval := segmentErrorLocation(dayTR, storage.IntervalRule{Unit: 
storage.DAY, Num: 5})
+       assert.Equal(t, "seg-20260601", seg)
+       assert.Equal(t, "5d", interval)
+
+       hourTR := &timestamp.TimeRange{Start: start, End: start.Add(time.Hour)}
+       seg, interval = segmentErrorLocation(hourTR, storage.IntervalRule{Unit: 
storage.HOUR, Num: 1})
+       assert.Equal(t, "seg-2026060100", seg)
+       assert.Equal(t, "1h", interval)

Review Comment:
   This test uses `time.UTC` for the start time, but the segment suffix logic 
in storage is defined in terms of `time.Local` (`ParseSegmentTime` uses 
`time.Local`, and `FormatSegmentTime` should match). In non-UTC environments 
the UTC date/hour can differ from the local one, making the expected suffixes 
here timezone-dependent.
   
   Construct the start time in `time.Local` so the expectations are stable 
across environments.



##########
banyand/backup/lifecycle/row_replay_pipeline.go:
##########
@@ -146,33 +175,116 @@ type batchSender struct {
        client    queue.Client
        publisher queue.BatchPublisher
        pipeline  *confirmPipeline
+       pool      *marshalBufferPool
        batch     []bus.Message
-       topic     bus.Topic
-       timeout   time.Duration
-       batchSize int
+       lent      [][]byte
+       // skippedDetail locates a bounded sample of skipped series (part, 
seriesID,
+       // reason) so the migration report can point an operator at the source 
data,
+       // not just a count. Capped at maxSkipDetail to bound memory.
+       skippedDetail []skipError
+       topic         bus.Topic
+       timeout       time.Duration
+       maxRows       int
+       maxBytes      int
+       inflightBytes int
+       // Flush-reason tallies: which cap forced each batch out. A flush 
triggered by
+       // both caps at once is attributed to bytes (the memory-protecting cap 
that
+       // fires early in the big-body cluster). tailFlushes counts the trailing
+       // end-of-part flush that neither cap triggered.
+       rowLimitFlushes  int
+       byteLimitFlushes int
+       tailFlushes      int
+       // skippedRows counts rows dropped because their series could not be 
resolved
+       // (errSkipSeries: a part block referencing a series absent from the 
segment's
+       // sidx that could not be rebuilt from the part's columns). The part 
still
+       // completes; this surfaces how much a source-data gap dropped instead 
of
+       // aborting the whole migration.
+       skippedRows int
+}
+
+// maxSkipDetail bounds how many distinct skipped-series locations are retained
+// for the report; the full count still accrues in skippedRows.
+const maxSkipDetail = 200
+
+// recordSkip tallies one skipped series and, up to the cap, retains its 
location.
+func (s *batchSender) recordSkip(err error) {
+       s.skippedRows++
+       if c := asSkipError(err); c != nil && len(s.skippedDetail) < 
maxSkipDetail {
+               s.skippedDetail = append(s.skippedDetail, *c)
+       }
 }

Review Comment:
   `skippedDetail` is intended to hold a bounded sample of *skipped series 
locations*, but `recordSkip` appends an entry for every skipped row. For series 
with many rows, this quickly fills the sample with duplicates of the same 
(partPath, seriesID, reason), reducing its diagnostic value.
   
   Since rows for a series are emitted contiguously, de-duplicate against the 
last recorded entry so the sample is effectively per-series while keeping the 
implementation lightweight.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to