Copilot commented on code in PR #1138:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1138#discussion_r3278060740


##########
banyand/cmd/migration/verify.go:
##########
@@ -0,0 +1,329 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "os/signal"
+       "strings"
+       "syscall"
+       "text/tabwriter"
+
+       "github.com/spf13/cobra"
+
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+)
+
+func newVerifyCmd() *cobra.Command {
+       var configPath string
+
+       cmd := &cobra.Command{
+               Use:   "verify",
+               Short: "Inspect a copy run: per-(entry, group) source vs target 
row counts, target segment grid alignment, sidx doc counts",
+               Long: `verify reads the same plan.yaml that the copy run 
consumed and walks
+each (entry, group) read-only:
+
+  - sums source row count by opening every src/seg-*/shard-*/<partID>
+    and totalling partMetadata.TotalCount
+  - enumerates target/seg-*/, opens each part and the per-seg union sidx
+  - flags whether each target seg's start time aligns to the entry
+    stage's SegmentInterval grid (IntervalRule.Standard)
+  - prints all numbers (source rows, target rows per seg, sidxDocs,
+    aligned y/n) to stdout for the operator to inspect
+
+verify never fails on mismatch — it just reports. Use it after a copy
+run, optionally after the data-copy ↔ data swap, to confirm the
+migration result.`,
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       plan, err := LoadCopyPlan(configPath)
+                       if err != nil {
+                               return err
+                       }
+                       // staging dir is unused by verify (read-only), pass "".
+                       cfg := plan.ToDirectCopyConfig("")
+
+                       ctx, stop := signal.NotifyContext(context.Background(),
+                               os.Interrupt, syscall.SIGTERM)
+                       defer stop()
+
+                       tally := &verifyTally{}
+                       runErr := measure.MigrationVerify(ctx, cfg, func(r 
measure.EntryGroupReport) {
+                               printOneReport(r)
+                               tally.absorb(r)
+                       })
+                       tally.printSummary()
+                       return runErr
+               },
+       }
+
+       cmd.Flags().StringVar(&configPath, "copy-config", "",
+               "path to the YAML migration copy plan that was used for 
`migration copy` (required)")
+       _ = cmd.MarkFlagRequired("copy-config")
+       return cmd
+}
+
+// verifyTally accumulates the per-(node, group) findings the callback
+// stream emits so we can print a single roll-up SUMMARY block at the
+// end of the run — the per-report stream prints itself.
+type verifyTally struct {
+       mismatches     []verifyMismatch
+       coverage       map[string]map[string]coverageState // node → group → 
(src/tgt presence)
+       nodeOrder      []string                            // first-seen 
ordering
+       groupOrder     []string                            // first-seen 
ordering
+       srcRowsTotal   uint64
+       tgtRowsTotal   uint64
+       segsTotal      int
+       segsMisaligned int
+}
+
+// coverageState records whether SOURCE and TARGET independently hold
+// any rows for one (node, group) pair. Four combinations exist, each
+// surfaced by a distinct token in the coverage table:
+//
+//     src=true, tgt=true   → "✓"  (both present, normal copy success)
+//     src=true, tgt=false  → "S"  (src has data but target is empty — copy 
lost this group)
+//     src=false, tgt=true  → "T"  (target has data without a source — orphan 
/ leftover)
+//     src=false, tgt=false → "--" (neither — PVC hash sharding excluded this 
group, normal)
+type coverageState struct {
+       src bool
+       tgt bool
+}
+
+// verifyMismatch records one (node, group) where source row count
+// did NOT equal target row count — surfaced in the SUMMARY block so
+// the operator immediately sees which PVCs / groups need follow-up.
+type verifyMismatch struct {
+       Stage    string
+       NodeName string
+       Group    string
+       SrcRows  uint64
+       TgtRows  uint64
+}
+
+// entryNodeName picks the first node from EntryNodes (entries usually
+// reference exactly one node); falls back to the entry stage when the
+// list is empty (e.g. backup-mode plans without explicit nodes).
+func entryNodeName(r measure.EntryGroupReport) string {
+       if len(r.EntryNodes) > 0 {
+               return r.EntryNodes[0]
+       }
+       return r.EntryStage
+}
+
+func (t *verifyTally) absorb(r measure.EntryGroupReport) {
+       node := entryNodeName(r)
+       if t.coverage == nil {
+               t.coverage = make(map[string]map[string]coverageState)
+       }
+       if _, ok := t.coverage[node]; !ok {
+               t.coverage[node] = make(map[string]coverageState)
+               t.nodeOrder = append(t.nodeOrder, node)
+       }
+       if _, ok := indexOf(t.groupOrder, r.Group); !ok {
+               t.groupOrder = append(t.groupOrder, r.Group)
+       }
+
+       t.srcRowsTotal += r.SrcRows
+       var tgtRows uint64
+       for _, s := range r.TargetSegs {
+               tgtRows += s.Rows
+               if !s.Aligned {
+                       t.segsMisaligned++
+               }
+       }
+       t.tgtRowsTotal += tgtRows
+       t.segsTotal += len(r.TargetSegs)
+
+       t.coverage[node][r.Group] = coverageState{
+               src: r.SrcRows > 0,
+               tgt: tgtRows > 0,
+       }
+
+       if tgtRows != r.SrcRows {
+               t.mismatches = append(t.mismatches, verifyMismatch{
+                       Stage:    r.EntryStage,
+                       NodeName: node,
+                       Group:    r.Group,
+                       SrcRows:  r.SrcRows,
+                       TgtRows:  tgtRows,
+               })
+       }
+}
+
+func indexOf(xs []string, v string) (int, bool) {
+       for i, x := range xs {
+               if x == v {
+                       return i, true
+               }
+       }
+       return 0, false
+}
+
+func (t *verifyTally) printSummary() {
+       fmt.Println("== SUMMARY ==")
+       t.printCoverageTable()
+       fmt.Println()
+       fmt.Printf("  target segments                : %d\n", t.segsTotal)
+       fmt.Printf("  target segments misaligned     : %d\n", t.segsMisaligned)
+       fmt.Printf("  source rows total              : %d\n", t.srcRowsTotal)
+       fmt.Printf("  target rows total              : %d\n", t.tgtRowsTotal)
+       if len(t.mismatches) == 0 {
+               if t.tgtRowsTotal > t.srcRowsTotal {
+                       fmt.Printf("  diff (tgt - src)               : %+d 
(UNEXPECTED — target has MORE rows than source)\n",
+                               int64(t.tgtRowsTotal)-int64(t.srcRowsTotal))
+               } else {
+                       fmt.Println("  src == tgt")
+               }
+               return
+       }
+       fmt.Println()
+       fmt.Println("  row-count mismatches (tgt - src, negative = rows dropped 
by copy):")
+       tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
+       fmt.Fprintln(tw, "    stage\tnode\tgroup\tsrc\ttgt\tdiff")
+       var total int64
+       for _, m := range t.mismatches {
+               diff := int64(m.TgtRows) - int64(m.SrcRows)
+               total += diff
+               fmt.Fprintf(tw, "    %s\t%s\t%s\t%d\t%d\t%+d\n",
+                       m.Stage, m.NodeName, m.Group, m.SrcRows, m.TgtRows, 
diff)
+       }
+       fmt.Fprintf(tw, "    %s\t\t\t\t\t%+d\n", "TOTAL", total)
+       _ = tw.Flush()
+       if total < 0 {
+               fmt.Println()
+               fmt.Println("  note: rows dropped are caused by slow-path 
mustInitFromDataPoints")
+               fmt.Println("        deduping (seriesID, timestamp) within each 
chunk flush — banyandb's")
+               fmt.Println("        merger can leave duplicated boundary rows 
in source parts, and")
+               fmt.Println("        only the latest version of each (sid, ts) 
survives in target.")
+               fmt.Println("        Pass --detail to list the exact missing 
(sid, ts, version) rows.")

Review Comment:
   The summary note tells operators to "Pass --detail" to list missing rows, 
but the verify command does not define a --detail flag. This will confuse 
users; update the message to point to the existing `analyze` subcommand (or 
implement a real --detail flag).
   



##########
banyand/measure/migration_schema.go:
##########
@@ -0,0 +1,420 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+
+       "github.com/blugelabs/bluge"
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       backupsnapshot 
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+// errSchemaPropertyMissing signals "no schema-property/_schema under
+// backupDir". Returned by findSchemaPropertyRoot when a caller asks for
+// a backup-tree discovery but the snapshot omitted the catalog.
+var errSchemaPropertyMissing = errors.New("schema-property catalog not found 
in backup")
+
+// schemaPropDoc is one decoded doc emitted by walkSchemaPropertyShard;
+// the caller decides whether the kind / group matches what it wants.
+type schemaPropDoc struct {
+       propID     string
+       kindName   string // "measure" / "group" / "stream" / ...
+       group      string // empty for non-measure docs
+       sourceJSON string // embedded protobuf JSON ready for kind-specific 
Unmarshal
+       modRev     int64
+       deleted    bool
+}
+
+// walkSchemaPropertyShard opens one shard of the backup's
+// schema-property bluge index and invokes visit() for each doc.
+// Visitors filter on kindName and fold into their own dedup tables.
+func walkSchemaPropertyShard(shardPath string, visit func(schemaPropDoc) 
error) error {
+       reader, err := bluge.OpenReader(bluge.DefaultConfig(shardPath))
+       if err != nil {
+               return fmt.Errorf("open bluge reader: %w", err)
+       }
+       defer func() { _ = reader.Close() }()
+       dmi, err := reader.Search(context.Background(),
+               bluge.NewTopNSearch(schemaSearchSize, bluge.NewMatchAllQuery()))
+       if err != nil {
+               return fmt.Errorf("search schema docs: %w", err)
+       }
+       for {
+               next, err := dmi.Next()
+               if err != nil {
+                       return fmt.Errorf("iterate schema docs: %w", err)
+               }
+               if next == nil {
+                       return nil
+               }
+               var sourceBytes []byte
+               var deleted bool
+               if err := next.VisitStoredFields(func(field string, value 
[]byte) bool {
+                       switch field {
+                       case schemaSourceField:
+                               sourceBytes = append([]byte(nil), value...)
+                       case schemaDeletedTag:
+                               if len(value) > 0 {
+                                       deleted = true
+                               }
+                       }
+                       return true
+               }); err != nil {
+                       return fmt.Errorf("visit schema doc: %w", err)
+               }
+               if len(sourceBytes) == 0 {
+                       continue
+               }
+               var prop propertyv1.Property
+               if err := protojson.Unmarshal(sourceBytes, &prop); err != nil {
+                       continue

Review Comment:
   walkSchemaPropertyShard silently ignores protojson.Unmarshal errors for the 
outer Property wrapper, which can make schema/catalog corruption look like 
"missing schema" later with less actionable errors. Consider returning an error 
(or at least logging/annotating it with the doc id/shard path) so operators 
immediately see the root cause.
   



##########
banyand/cmd/migration/copy.go:
##########
@@ -0,0 +1,194 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "os/signal"
+       "runtime"
+       "runtime/debug"
+       "strconv"
+       "syscall"
+       "time"
+
+       "github.com/spf13/cobra"
+
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/pkg/cgroups"
+)
+
+func newCopyCmd() *cobra.Command {
+       var configPath string
+
+       cmd := &cobra.Command{
+               Use:   "copy",
+               Short: "Direct-file copy of measure parts into local measure 
root with row-level grid alignment",
+               Long: `copy walks every source part, reads each row's actual 
timestamp, and routes
+the row to the grid-aligned target segment via SegmentInterval.Standard.
+A single source part can fan out to multiple target segments (one new
+part each).
+
+The run is fully driven by a YAML config (--copy-config). One config
+carries one source (backup snapshot or live PVC mounts), one group list,
+and one or more (stage, target) entries — schemas and per-group union
+sidx are loaded/built once and reused across every entry.
+
+Per group, a single union sidx is built in the staging directory by
+deduplicating every source seg-*/sidx doc by SeriesID; that union sidx
+is then broadcast (byte-copied) into every aligned target segment that
+received any rows. A fresh .snp is written per (target segment, shard)
+so tsTable.loadSnapshot picks up the new parts on next startup.
+
+Schemas (measure tag families + IndexMode bit) and group resource
+opts (SegmentInterval per LifecycleStage) are read directly from the
+source's schema-property bluge catalog — no liaison access is needed.
+Groups containing any IndexMode measure are rejected up front, since
+their fields live inside sidx and the broadcast strategy would corrupt
+cross-node deduplication at the liaison.
+
+The staging directory is removed in full when the run finishes
+(success or failure). Run inside a data pod whose BanyanDB process is
+NOT writing to any of the entry targets. Source and target shard
+topology must match.`,
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       applyGoMemLimit()
+
+                       plan, err := LoadCopyPlan(configPath)
+                       if err != nil {
+                               return err
+                       }
+                       stagingDir, err := PrepareStagingDir(plan.StagingDir)
+                       if err != nil {
+                               return err
+                       }
+                       defer func() {
+                               if rmErr := os.RemoveAll(stagingDir); rmErr != 
nil {
+                                       fmt.Fprintf(os.Stderr,
+                                               "warning: failed to clean 
staging dir %s: %v\n",
+                                               stagingDir, rmErr)
+                               }
+                       }()
+
+                       cfg := plan.ToDirectCopyConfig(stagingDir)
+
+                       ctx, stop := signal.NotifyContext(context.Background(),
+                               os.Interrupt, syscall.SIGTERM)
+                       defer stop()
+
+                       res, runErr := measure.MigrationCopy(ctx, cfg)
+                       printCopyResult(res)
+                       return runErr
+               },
+       }
+
+       cmd.Flags().StringVar(&configPath, "copy-config", "",
+               "path to the YAML migration copy plan (required)")
+       _ = cmd.MarkFlagRequired("copy-config")
+       return cmd
+}
+
+func printCopyResult(res measure.DirectCopyResult) {
+       fmt.Println("DONE in", res.Duration)
+       fmt.Printf("   target segments   : %d\n", res.Segments)
+       fmt.Printf("   source parts      : %d\n", res.SourceParts)
+       fmt.Printf("   target mem-parts  : %d (pre-merge; banyandb's merge loop 
will compact)\n", res.TargetParts)
+       fmt.Printf("   rows copied       : %d\n", res.Rows)
+       fmt.Printf("   bytes written     : %d\n", res.Bytes)
+       fmt.Printf("   fast-path parts   : %d\n", measure.FastPathHits())
+       fmt.Printf("   slow-path parts   : %d\n", measure.SlowPathHits())
+       fmt.Printf("   slow-path rows    : %d\n", measure.SlowPathRows())
+}
+
+// applyGoMemLimit auto-sets the Go runtime's heap soft-cap (GOMEMLIMIT)
+// from the process cgroup memory limit when the operator did not pass
+// one explicitly.
+//
+// Why this is necessary:
+//   - Go's runtime sizes the heap from live set + GOGC by default; it
+//     does NOT read cgroup limits on its own. On a memory-constrained
+//     pod a momentary heap spike during slow-path zstd flush can sail
+//     right past the pod limit and trip the kernel OOMKiller (or
+//     macOS jetsam on kind), wiping the whole migration mid-run.
+//   - Historical evidence from this codebase: an unconstrained run on
+//     a 16 GiB GKE pod OOM'd at 99 % through the warm tier; raising
+//     the limit to 20 GiB AND setting GOMEMLIMIT to 17 GiB (85 %) was
+//     the fix. The shared flush-pool + Phase A union-sidx refactor
+//     reduced peak memory, but no run has been validated WITHOUT
+//     GOMEMLIMIT post-refactor, so we still ship the safety net.
+//
+// Why we don't rely on operators setting GOMEMLIMIT manually:
+//   - Go 1.19+ honors a GOMEMLIMIT env at startup, so an operator who
+//     remembered to set it gets the same effect. This helper is a UX
+//     fallback for the common case where it was forgotten — we read
+//     the cgroup limit (works on kind, GKE, plain docker run) and
+//     apply 85 % of it. The 15 % headroom is for the runtime's own
+//     overhead + the Linux page cache.
+//
+// Resolution order: GOMEMLIMIT env wins (we just log and return);
+// otherwise read cgroup limit and call debug.SetMemoryLimit; otherwise
+// log "not set" and let the process run uncapped (bare-host case where
+// the operator presumably has enough RAM).
+func applyGoMemLimit() {
+       if raw := os.Getenv("GOMEMLIMIT"); raw != "" {
+               // Honor whatever the env already set — Go's runtime parsed it
+               // at startup. Just log.
+               fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT honored from 
env: %s\n", raw)
+               return
+       }
+       limit, err := cgroups.MemoryLimit()
+       if err != nil || limit <= 0 {
+               // Fall back to 85% of host total memory. cgroups.MemoryLimit
+               // returns -1 for "max" (unlimited) on a non-containerised host.
+               var ms runtime.MemStats
+               runtime.ReadMemStats(&ms)
+               if ms.Sys == 0 {
+                       fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT not set 
(no cgroup limit, runtime.MemStats unavailable)\n")
+                       return
+               }
+               // MemStats.Sys reports OS memory reserved for the runtime, not
+               // host RAM. As a heuristic we don't set a limit here.
+               fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT not set (cgroup 
limit unavailable; running on bare host)\n")
+               return

Review Comment:
   The comment says this branch "Fall back[s] to 85% of host total memory", but 
the implementation reads runtime.MemStats.Sys (not host RAM) and then 
intentionally does not set any limit. Please either implement an actual 
host-memory fallback (e.g., via sysinfo) or adjust the comment to match the 
current behavior.



##########
banyand/measure/migration_verify.go:
##########
@@ -0,0 +1,405 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "os"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/blugelabs/bluge"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// SegmentReport summarizes one `seg-*` directory under a measure
+// group root. Populated by EnumerateGroupTarget for the migration
+// verify CLI.
+//
+// Aligned is true only when ALL of these hold:
+//   - the dir name parses cleanly into a start time;
+//   - start = IntervalRule.Standard(start) (start is on the grid);
+//   - <seg>/metadata exists, is well-formed JSON, and carries a non-empty 
endTime;
+//   - end = IntervalRule.NextTime(start) (the segment spans exactly one 
bucket);
+//   - the inclusive last instant (end - 1ns) standardizes back to start (start
+//     and end fall in the same IntervalRule bucket).
+type SegmentReport struct {
+       StartTime    time.Time
+       EndTime      time.Time
+       Seg          string
+       Rows         uint64
+       SidxDocCount uint64
+       Shards       int
+       Parts        int
+       Aligned      bool
+       SidxOpened   bool
+}
+
+// EntryGroupReport aggregates source row count + target per-seg report
+// for one (entry, group) pair. Both source and target are read-only;
+// the verify command never mutates the dataset.
+type EntryGroupReport struct {
+       Group       string
+       EntryStage  string
+       EntryTarget string
+       TargetGroup string
+       EntryNodes  []string
+       SrcRoots    []string
+       TargetSegs  []SegmentReport
+       SrcRows     uint64
+       SrcParts    int
+}
+
+// VerifyShardParts reads <shardDir>'s newest `.snp`, confirms every
+// listed partID has an on-disk dir, opens each part, and returns the
+// sum of partMetadata.TotalCount plus the part count.
+//
+// Errors (instead of t.Fatalf) so this helper is usable from a CLI.
+func VerifyShardParts(shardDir string, fileSystem fs.FileSystem) (uint64, int, 
error) {
+       entries, err := os.ReadDir(shardDir)
+       if err != nil {
+               return 0, 0, fmt.Errorf("read shard: %w", err)
+       }
+       var snpPath string
+       for _, e := range entries {
+               if !e.IsDir() && strings.HasSuffix(e.Name(), 
directCopySnpSuffix) {
+                       candidate := filepath.Join(shardDir, e.Name())
+                       if snpPath == "" || candidate > snpPath {
+                               snpPath = candidate
+                       }
+               }
+       }
+       if snpPath == "" {
+               return 0, 0, fmt.Errorf("no .snp file under %s", shardDir)
+       }
+       snpRaw, err := os.ReadFile(snpPath)
+       if err != nil {
+               return 0, 0, fmt.Errorf("read .snp: %w", err)
+       }
+       var partNames []string
+       if err := json.Unmarshal(snpRaw, &partNames); err != nil {
+               return 0, 0, fmt.Errorf("parse .snp: %w", err)
+       }

Review Comment:
   VerifyShardParts takes an fs.FileSystem but uses os.ReadDir/os.ReadFile and 
filepath directly for directory listing and .snp reads. This makes the 
fs.FileSystem parameter only partially effective and prevents reuse with 
non-local/test filesystems; consider switching these reads to 
fileSystem.ReadDir/fileSystem.Read (and use os only in a LocalFileSystem 
adapter), or drop the parameter to avoid a misleading API.



##########
CHANGES.md:
##########
@@ -54,6 +54,7 @@ Release Notes.
   - Fix the `notifiedModRevision` watermark advancement in 
`SchemaRegistry.processInitialResourceFromProperty`, `handleWatchEvent` (DELETE 
branch), and `handleDeletion`. Previously `AdvanceNotified` was gated on 
`cache.Update` / `cache.Delete` returning true, but those methods compare 
`latestUpdateAt` (property timestamp) while the watermark tracks `modRevision` 
(etcd revision). When the property timestamp is stale (e.g. a no-op Update that 
doesn't change the measure spec), the cache rejects the entry and the watermark 
cannot advance, causing `AwaitRevisionApplied(R)` to block forever even though 
the event has been fully processed. `AdvanceNotified` now fires unconditionally 
whenever an event reaches the processing stage, regardless of cache mutation 
outcome.
   - Fix the `modRevision` contract on no-op Update RPCs 
(`MeasureRegistryService.Update`, etc.). Previously `updateResource` detected 
unchanged content via `CheckerMap` and short-circuited without writing to the 
property store, but the caller had already fabricated `modRevision = 
time.Now().UnixNano()` and returned it. The returned revision never appeared in 
the property watch stream, so `AwaitRevisionApplied(R)` would hang. 
`updateResource` now returns `(int64, error)` — the existing property's 
`modRevision` for no-op updates, the new revision for real updates — so callers 
always return a revision the barrier can observe.
   - Add end-to-end observability for liaison internal queue pipelines with 
per-topic metrics for queue_sub and queue_pub, along with Grafana panels and 
troubleshooting docs.
+  - Introduce measure migration tool. 

Review Comment:
   Trailing whitespace at end of the new CHANGES entry; please remove it to 
keep the changelog clean.
   



##########
banyand/cmd/migration/verify.go:
##########
@@ -0,0 +1,329 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "os/signal"
+       "strings"
+       "syscall"
+       "text/tabwriter"
+
+       "github.com/spf13/cobra"
+
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+)
+
+func newVerifyCmd() *cobra.Command {
+       var configPath string
+
+       cmd := &cobra.Command{
+               Use:   "verify",
+               Short: "Inspect a copy run: per-(entry, group) source vs target 
row counts, target segment grid alignment, sidx doc counts",
+               Long: `verify reads the same plan.yaml that the copy run 
consumed and walks
+each (entry, group) read-only:
+
+  - sums source row count by opening every src/seg-*/shard-*/<partID>
+    and totalling partMetadata.TotalCount
+  - enumerates target/seg-*/, opens each part and the per-seg union sidx
+  - flags whether each target seg's start time aligns to the entry
+    stage's SegmentInterval grid (IntervalRule.Standard)
+  - prints all numbers (source rows, target rows per seg, sidxDocs,
+    aligned y/n) to stdout for the operator to inspect
+
+verify never fails on mismatch — it just reports. Use it after a copy
+run, optionally after the data-copy ↔ data swap, to confirm the
+migration result.`,
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       plan, err := LoadCopyPlan(configPath)
+                       if err != nil {
+                               return err
+                       }
+                       // staging dir is unused by verify (read-only), pass "".
+                       cfg := plan.ToDirectCopyConfig("")
+
+                       ctx, stop := signal.NotifyContext(context.Background(),
+                               os.Interrupt, syscall.SIGTERM)
+                       defer stop()
+
+                       tally := &verifyTally{}
+                       runErr := measure.MigrationVerify(ctx, cfg, func(r 
measure.EntryGroupReport) {
+                               printOneReport(r)
+                               tally.absorb(r)
+                       })
+                       tally.printSummary()
+                       return runErr
+               },
+       }
+
+       cmd.Flags().StringVar(&configPath, "copy-config", "",
+               "path to the YAML migration copy plan that was used for 
`migration copy` (required)")
+       _ = cmd.MarkFlagRequired("copy-config")
+       return cmd
+}
+
+// verifyTally accumulates the per-(node, group) findings the callback
+// stream emits so we can print a single roll-up SUMMARY block at the
+// end of the run — the per-report stream prints itself.
+type verifyTally struct {
+       mismatches     []verifyMismatch
+       coverage       map[string]map[string]coverageState // node → group → 
(src/tgt presence)
+       nodeOrder      []string                            // first-seen 
ordering
+       groupOrder     []string                            // first-seen 
ordering
+       srcRowsTotal   uint64
+       tgtRowsTotal   uint64
+       segsTotal      int
+       segsMisaligned int
+}
+
+// coverageState records whether SOURCE and TARGET independently hold
+// any rows for one (node, group) pair. Four combinations exist, each
+// surfaced by a distinct token in the coverage table:
+//
+//     src=true, tgt=true   → "✓"  (both present, normal copy success)
+//     src=true, tgt=false  → "S"  (src has data but target is empty — copy 
lost this group)
+//     src=false, tgt=true  → "T"  (target has data without a source — orphan 
/ leftover)
+//     src=false, tgt=false → "--" (neither — PVC hash sharding excluded this 
group, normal)
+type coverageState struct {
+       src bool
+       tgt bool
+}
+
+// verifyMismatch records one (node, group) where source row count
+// did NOT equal target row count — surfaced in the SUMMARY block so
+// the operator immediately sees which PVCs / groups need follow-up.
+type verifyMismatch struct {
+       Stage    string
+       NodeName string
+       Group    string
+       SrcRows  uint64
+       TgtRows  uint64
+}
+
+// entryNodeName picks the first node from EntryNodes (entries usually
+// reference exactly one node); falls back to the entry stage when the
+// list is empty (e.g. backup-mode plans without explicit nodes).
+func entryNodeName(r measure.EntryGroupReport) string {
+       if len(r.EntryNodes) > 0 {
+               return r.EntryNodes[0]
+       }
+       return r.EntryStage
+}

Review Comment:
   verifyTally's summary is labeled "per (node, group)", but entryNodeName 
collapses EntryNodes to only the first element. If an entry includes multiple 
nodes (allowed by the plan schema, and used by the backup-mode example), the 
coverage table and mismatch list will attribute aggregated src/tgt rows to a 
single node and become misleading. Consider either enforcing 1 node per entry 
for verify, or changing the summary dimension to be per-entry (stage/target) 
rather than per-node, or splitting the report per node.



##########
banyand/measure/migration_analyze.go:
##########
@@ -0,0 +1,928 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "fmt"
+       "os"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// AnalyzeGroupResult is the row-level summary `migration analyze`
+// returns for one (entry, group): how many physical rows are on disk
+// versus how many distinct (seriesID, timestamp) keys they describe.
+//
+// Two diff numbers matter:
+//   - DuplicateRows is the GLOBAL diff (cross-part dups). These are
+//     NOT dropped by migration copy because slow-path processes one
+//     source part at a time and never cross-merges buckets.
+//   - PerPartDupRows is the WITHIN-part diff summed over every source
+//     part. This IS the count slow-path mustInitFromDataPoints would
+//     drop on flush — it should match `verify`'s src-tgt diff.
+type AnalyzeGroupResult struct {
+       SamplesByVersion   []AnalyzeKeyMulti
+       PerPartDups        []AnalyzePerPart
+       TotalRows          uint64
+       UniqueKeys         uint64
+       DuplicateRows      uint64
+       PerPartDupRows     uint64
+       KeysWithDuplicates uint64 // count of (sid, ts) keys that appear in >1 
physical rows
+       PartsScanned       int
+}
+
+// AnalyzePerPart is one row in the per-part dup breakdown — non-zero
+// PartialDupRows mean THIS specific source part has rows the slow
+// path would dedup on its bucket flush. Boundaries lists the
+// "block-N ends at ts=T, block-N+1 (same series) starts at ts=T"
+// pairs WITHIN this part — these are exactly the rows the
+// merger-fast-path bug (merger.go:306 `<=`) emits.
+type AnalyzePerPart struct {
+       PartID         string
+       SegName        string
+       ShardName      string
+       SourceRoot     string
+       Boundaries     []AnalyzeBoundaryPair
+       Rows           uint64
+       UniqueKeys     uint64
+       PartialDupRows uint64
+}
+
+// AnalyzeBoundaryPair names a single boundary collision inside one
+// part: block at BlockA ends at (SeriesID, Timestamp), block at
+// BlockB (immediately following) begins at the same (SeriesID,
+// Timestamp). Both copies survived because the merger fast-path
+// wrote pendingBlock verbatim before doing row-level merge.
+type AnalyzeBoundaryPair struct {
+       SeriesID  common.SeriesID
+       Timestamp int64
+       VersionA  int64
+       VersionB  int64
+       BlockA    int
+       BlockB    int
+}
+
+// AnalyzeKeyMulti reports one (sid, ts) pair that has more than one
+// row on disk, along with every version banyandb stored for it.
+// Useful operator-facing proof of the "newest version wins" dedup
+// banyandb's merger applies.
+type AnalyzeKeyMulti struct {
+       Versions  []VersionWithPath
+       SeriesID  common.SeriesID
+       Timestamp int64
+}
+
+// VersionWithPath pairs a row's version field with the absolute part
+// directory it physically lives in, so the operator can grep / cat
+// the on-disk files directly to confirm duplicates.
+type VersionWithPath struct {
+       Path    string
+       Version int64
+}
+
+type analyzeKey struct {
+       sid common.SeriesID
+       ts  int64
+}
+
+// pathTable interns part paths so we don't pay a string allocation
+// per row in the (sid, ts) → []rowSrc map. Each unique path lands at
+// a small uint32 index; per-row overhead drops from ~80B to 12B.
+type pathTable struct {
+       idx   map[string]uint32
+       paths []string
+}
+
+func newPathTable() *pathTable {
+       return &pathTable{idx: make(map[string]uint32, 256)}
+}
+
+func (t *pathTable) intern(p string) uint32 {
+       if i, ok := t.idx[p]; ok {
+               return i
+       }
+       i := uint32(len(t.paths))
+       t.paths = append(t.paths, p)
+       t.idx[p] = i
+       return i
+}
+
+func (t *pathTable) get(i uint32) string {
+       return t.paths[int(i)]
+}
+
+// analyzeRowSrc is a (version, partPathIdx) tuple stored in the
+// (sid, ts) → []analyzeRowSrc map. partPathIdx is an index into the
+// scan's pathTable.
+type analyzeRowSrc struct {
+       version int64
+       pathIdx uint32
+}
+
+// ResolveEntrySrcRoots is the exported wrapper around resolveEntrySrcRoots
+// so cmd/migration can reach the same plan→roots logic the copy + verify
+// pipelines use internally.
+func ResolveEntrySrcRoots(cfg DirectCopyConfig, entry DirectCopyEntry, group 
string) []string {
+       return resolveEntrySrcRoots(cfg, entry, group)
+}
+
+// AnalyzeMissingRow names one row that exists in src but not in tgt
+// after a migration copy: (sid, ts) appears more times physically in
+// source than in target, and MissingVersions lists the version values
+// that didn't survive into the target (sorted ascending).
+type AnalyzeMissingRow struct {
+       MissingVersions []VersionWithPath
+       SourceVersions  []VersionWithPath
+       TargetVersions  []VersionWithPath
+       SeriesID        common.SeriesID
+       Timestamp       int64
+}
+
+// AnalyzeDiffResult is the source-vs-target multiset diff for one
+// (entry, group). It is filled in by AnalyzeGroupDiffWithTarget and
+// answers the question "which exact rows did `migration copy` drop?".
+type AnalyzeDiffResult struct {
+       Missing     []AnalyzeMissingRow
+       SourceRows  uint64
+       TargetRows  uint64
+       MissingRows uint64
+       MissingKeys int
+}
+
+// AnalyzeGroupRows walks every <root>/seg-*/shard-*/<partID>/ in
+// srcRoots, opens each part read-only, decodes only block metadata +
+// timestamps (NOT tag/field bodies — keeps the scan cheap), and
+// aggregates (seriesID, timestamp) keys with their versions.
+//
+// Returns:
+//   - TotalRows     : every physical row encountered on disk
+//   - UniqueKeys    : distinct (sid, ts) pairs
+//   - DuplicateRows : TotalRows - UniqueKeys
+//   - SamplesByVersion[0:sampleCap]: first few (sid, ts) keys that had
+//     >1 row on disk, with all version values they carried.
+//
+// Memory: roughly 32 bytes per unique key + (versions × 8B). Cold-tier
+// groups with hundreds of millions of unique keys can be too large
+// for a 20 GiB pod — call AnalyzeGroupRows on a small (entry, group)
+// pair as a sanity probe, not on every shard at once.
+func AnalyzeGroupRows(srcRoots []string, fileSystem fs.FileSystem, sampleCap 
int) (AnalyzeGroupResult, error) {
+       var res AnalyzeGroupResult
+       keys := make(map[analyzeKey][]analyzeRowSrc, 1<<20)
+       paths := newPathTable()
+
+       // Pre-pass: open each part's metadata only (cheap) to learn the
+       // expected total row count, so the 10s ticker can report a real %.
+       expectedTotal, err := sumGroupExpectedRows(srcRoots, fileSystem)
+       if err != nil {
+               return res, fmt.Errorf("count expected rows: %w", err)
+       }
+       progressDone := startScanProgressReporter(&res.TotalRows, 
uint64(expectedTotal))
+       defer close(progressDone)
+
+       var (
+               compressed []byte
+               raw        []byte
+               bms        []blockMetadata
+               timestamps []int64
+               versions   []int64
+       )
+
+       for _, root := range srcRoots {
+               segEntries, err := os.ReadDir(root)
+               if err != nil {
+                       if os.IsNotExist(err) {
+                               continue
+                       }
+                       return res, fmt.Errorf("read src root %s: %w", root, 
err)
+               }
+               for _, se := range segEntries {
+                       if !se.IsDir() || !strings.HasPrefix(se.Name(), 
directCopySegPrefix) {
+                               continue
+                       }
+                       segDir := filepath.Join(root, se.Name())
+                       shardEntries, shardReadErr := os.ReadDir(segDir)
+                       if shardReadErr != nil {
+                               return res, fmt.Errorf("read src seg %s: %w", 
segDir, shardReadErr)
+                       }
+                       for _, sh := range shardEntries {
+                               if !sh.IsDir() || !strings.HasPrefix(sh.Name(), 
directCopyShardPrefix) {
+                                       continue
+                               }
+                               shardDir := filepath.Join(segDir, sh.Name())
+                               partEntries, partsReadErr := 
os.ReadDir(shardDir)
+                               if partsReadErr != nil {
+                                       return res, fmt.Errorf("read src shard 
%s: %w", shardDir, partsReadErr)
+                               }
+                               for _, pe := range partEntries {
+                                       if !pe.IsDir() || 
!directCopyPartDirPattern.MatchString(pe.Name()) {
+                                               continue
+                                       }
+                                       partID, parseErr := 
strconv.ParseUint(pe.Name(), 16, 64)
+                                       if parseErr != nil {
+                                               return res, fmt.Errorf("parse 
partID %s: %w", pe.Name(), parseErr)
+                                       }
+                                       partDupKeys := make(map[analyzeKey]int, 
1<<12)
+                                       partPath := filepath.Join(shardDir, 
pe.Name())
+                                       rowsThisPart, boundaries, scanErr := 
scanOnePartIntoMap(
+                                               partID, shardDir, partPath, 
fileSystem,
+                                               keys, paths, partDupKeys, &res,
+                                               &compressed, &raw, &bms, 
&timestamps, &versions,
+                                       )
+                                       if scanErr != nil {
+                                               return res, scanErr
+                                       }
+                                       var partDups uint64
+                                       for _, c := range partDupKeys {
+                                               if c > 1 {
+                                                       partDups += uint64(c - 
1)
+                                               }
+                                       }
+                                       res.PerPartDupRows += partDups
+                                       if partDups > 0 {
+                                               res.PerPartDups = 
append(res.PerPartDups, AnalyzePerPart{
+                                                       PartID:         
pe.Name(),
+                                                       SegName:        
se.Name(),
+                                                       ShardName:      
sh.Name(),
+                                                       SourceRoot:     root,
+                                                       Boundaries:     
boundaries,
+                                                       Rows:           
rowsThisPart,
+                                                       UniqueKeys:     
uint64(len(partDupKeys)),
+                                                       PartialDupRows: 
partDups,
+                                               })
+                                       }
+                               }
+                       }
+               }
+       }
+
+       res.UniqueKeys = uint64(len(keys))
+       res.DuplicateRows = res.TotalRows - res.UniqueKeys
+
+       // Single pass to count every (sid, ts) key with >1 row AND collect
+       // up to sampleCap samples. Counting always; sampling stops at cap.
+       for k, vs := range keys {
+               if len(vs) <= 1 {
+                       continue
+               }
+               res.KeysWithDuplicates++
+               if sampleCap > 0 && len(res.SamplesByVersion) < sampleCap {
+                       res.SamplesByVersion = append(res.SamplesByVersion, 
AnalyzeKeyMulti{
+                               SeriesID:  k.sid,
+                               Timestamp: k.ts,
+                               Versions:  rowSrcsToVersionsWithPath(vs, paths),
+                       })
+               }
+       }
+       return res, nil
+}
+
+// rowSrcsToVersionsWithPath turns the internal (version, pathIdx)
+// slice into the operator-facing []VersionWithPath, sorted by version
+// ascending. Used by every CLI emission so output stays consistent.
+func rowSrcsToVersionsWithPath(rs []analyzeRowSrc, paths *pathTable) 
[]VersionWithPath {
+       out := make([]VersionWithPath, len(rs))
+       for i, r := range rs {
+               out[i] = VersionWithPath{Version: r.version, Path: 
paths.get(r.pathIdx)}
+       }
+       sort.Slice(out, func(i, j int) bool { return out[i].Version < 
out[j].Version })
+       return out
+}
+
+// scanOnePartIntoMap opens one part, walks every block in on-disk
+// order, and:
+//
+//   - merges every row's (version, partPath) under its (seriesID,
+//     timestamp) key in keys (global, across all parts). The path is
+//     interned via the shared pathTable so per-row overhead stays at
+//     12B regardless of how long the path is.
+//   - increments partDupKeys per-part so the caller derives the
+//     within-part dedup count
+//   - records every boundary collision (block N ends at ts=T,
+//     block N+1 begins at ts=T, same series) — these are exactly
+//     the rows the merger-fast-path bug at merger.go:306 emits
+//
+// Returns rowsInPart, boundaries, error.
+func scanOnePartIntoMap(
+       partID uint64,
+       shardDir, partPath string,
+       fileSystem fs.FileSystem,
+       keys map[analyzeKey][]analyzeRowSrc,
+       paths *pathTable,
+       partDupKeys map[analyzeKey]int,
+       res *AnalyzeGroupResult,
+       compressed, raw *[]byte,
+       bms *[]blockMetadata,
+       timestamps, versions *[]int64,
+) (partRows uint64, boundaries []AnalyzeBoundaryPair, err error) {
+       var p *part
+       defer func() {
+               if p != nil {
+                       p.close()
+               }
+               if r := recover(); r != nil {
+                       err = fmt.Errorf("scan part %016x panicked: %v", 
partID, r)
+               }
+       }()
+       p = mustOpenFilePart(partID, shardDir, fileSystem)
+       res.PartsScanned++
+
+       blockIdx := 0
+       var prevSet bool
+       var prevSid common.SeriesID
+       var prevTS, prevVersion int64
+       var prevBlockIdx int
+
+       for i := range p.primaryBlockMetadata {
+               pbm := &p.primaryBlockMetadata[i]
+               *compressed = bytes.ResizeOver(*compressed, int(pbm.size))
+               fs.MustReadData(p.primary, int64(pbm.offset), *compressed)
+               var err error
+               *raw, err = zstd.Decompress((*raw)[:0], *compressed)
+               if err != nil {
+                       return partRows, boundaries, fmt.Errorf("decompress 
primary block: %w", err)
+               }
+               *bms, err = unmarshalBlockMetadata((*bms)[:0], *raw)
+               if err != nil {
+                       return partRows, boundaries, fmt.Errorf("unmarshal 
block metadata: %w", err)
+               }
+               for j := range *bms {
+                       bm := &(*bms)[j]
+                       *timestamps, *versions = 
mustReadTimestampsFrom((*timestamps)[:0], (*versions)[:0], &bm.timestamps, 
int(bm.count), p.timestamps)
+                       n := int(bm.count)
+                       if n == 0 {
+                               blockIdx++
+                               continue
+                       }
+                       if prevSet && bm.seriesID == prevSid && 
(*timestamps)[0] == prevTS {
+                               boundaries = append(boundaries, 
AnalyzeBoundaryPair{
+                                       SeriesID:  prevSid,
+                                       Timestamp: prevTS,
+                                       VersionA:  prevVersion,
+                                       VersionB:  (*versions)[0],
+                                       BlockA:    prevBlockIdx,
+                                       BlockB:    blockIdx,
+                               })
+                       }
+                       partPathIdx := paths.intern(partPath)
+                       for k := 0; k < n; k++ {
+                               ak := analyzeKey{sid: bm.seriesID, ts: 
(*timestamps)[k]}
+                               keys[ak] = append(keys[ak], 
analyzeRowSrc{version: (*versions)[k], pathIdx: partPathIdx})
+                               partDupKeys[ak]++
+                               atomic.AddUint64(&res.TotalRows, 1)
+                               partRows++
+                       }
+                       prevSet = true
+                       prevSid = bm.seriesID
+                       prevTS = (*timestamps)[n-1]
+                       prevVersion = (*versions)[n-1]
+                       prevBlockIdx = blockIdx
+                       blockIdx++
+               }
+       }
+       return partRows, boundaries, nil
+}
+
+// AnalyzeGroupDiffWithTarget walks both source roots and the target
+// group root, builds a (sid, ts) → []version multiset for each side,
+// and returns the rows that exist in src but NOT in target — i.e. the
+// exact rows `migration copy` dropped at slow-path bucket-flush time.
+//
+// For each (sid, ts) where len(src.versions) > len(tgt.versions), the
+// missing version values are computed via multiset subtraction (every
+// version in tgt cancels one identical entry in src; anything left in
+// src is reported as missing). MissingRows summed should equal
+// `verify`'s src-tgt row diff for the same (entry, group).
+//
+// Memory: ~2x of AnalyzeGroupRows since both src and tgt maps are
+// held concurrently. Use on small/medium entries.
+func AnalyzeGroupDiffWithTarget(srcRoots []string, targetGroupRoot string, 
fileSystem fs.FileSystem, sampleCap int) (AnalyzeDiffResult, error) {
+       var res AnalyzeDiffResult
+
+       srcParts, srcPathByID, err := openAllPartsInRoots(srcRoots, fileSystem)
+       if err != nil {
+               return res, fmt.Errorf("open src parts: %w", err)
+       }
+       defer closeAllParts(srcParts)
+       tgtParts, tgtPathByID, err := 
openAllPartsInRoots([]string{targetGroupRoot}, fileSystem)
+       if err != nil {
+               return res, fmt.Errorf("open tgt parts: %w", err)
+       }
+       defer closeAllParts(tgtParts)
+
+       var srcTotal, tgtTotal int64
+       for _, p := range srcParts {
+               srcTotal += int64(p.partMetadata.TotalCount)
+       }
+       for _, p := range tgtParts {
+               tgtTotal += int64(p.partMetadata.TotalCount)
+       }
+
+       srcStream, srcCleanup := newSidGroupStream(srcParts, srcPathByID)
+       defer srcCleanup()
+       tgtStream, tgtCleanup := newSidGroupStream(tgtParts, tgtPathByID)
+       defer tgtCleanup()
+
+       progressDone := startDiffProgressReporter(&srcStream.rowsRead, 
&tgtStream.rowsRead, srcTotal, tgtTotal)
+       defer close(progressDone)
+
+       // blockReader emits blocks in (seriesID asc, minTimestamp asc) order
+       // but does NOT row-sort by ts WITHIN a seriesID when that series
+       // spans overlapping ts ranges across parts. So we buffer one whole
+       // seriesID's worth of rows from each side, sort by ts, then
+       // row-merge-diff at ts granularity — same algorithm as before but
+       // guaranteed sorted within each (sid, ts).
+       srcGrp, srcOK := srcStream.nextGroup()
+       tgtGrp, tgtOK := tgtStream.nextGroup()
+       for srcOK || tgtOK {
+               var cmpSid int
+               switch {
+               case !srcOK:
+                       cmpSid = +1
+               case !tgtOK:
+                       cmpSid = -1
+               case srcGrp.sid < tgtGrp.sid:
+                       cmpSid = -1
+               case srcGrp.sid > tgtGrp.sid:
+                       cmpSid = +1
+               default:
+                       cmpSid = 0
+               }
+               switch {
+               case cmpSid < 0:
+                       absorbSrcOnlyGroup(srcGrp, sampleCap, &res)
+                       srcGrp, srcOK = srcStream.nextGroup()
+               case cmpSid > 0:
+                       res.TargetRows += uint64(len(tgtGrp.rows))
+                       tgtGrp, tgtOK = tgtStream.nextGroup()
+               default:
+                       diffSidGroup(srcGrp, tgtGrp, sampleCap, &res)
+                       srcGrp, srcOK = srcStream.nextGroup()
+                       tgtGrp, tgtOK = tgtStream.nextGroup()
+               }
+       }
+       if err := srcStream.err(); err != nil {
+               return res, fmt.Errorf("src block reader: %w", err)
+       }
+       if err := tgtStream.err(); err != nil {
+               return res, fmt.Errorf("tgt block reader: %w", err)
+       }
+       return res, nil
+}
+
+// absorbSrcOnlyGroup records every src row of a series whose sid does
+// not exist on the target side: each (sid, ts) bucket of src becomes
+// a missing entry, with all its versions reported.
+func absorbSrcOnlyGroup(g *sidGroup, sampleCap int, res *AnalyzeDiffResult) {
+       res.SourceRows += uint64(len(g.rows))
+       i := 0
+       for i < len(g.rows) {
+               ts := g.rows[i].ts
+               j := i
+               for j < len(g.rows) && g.rows[j].ts == ts {
+                       j++
+               }
+               srcVs := make([]VersionWithPath, 0, j-i)
+               for k := i; k < j; k++ {
+                       srcVs = append(srcVs, VersionWithPath{Version: 
g.rows[k].version, Path: g.rows[k].partPath})
+               }
+               res.MissingKeys++
+               res.MissingRows += uint64(len(srcVs))
+               if sampleCap == 0 || len(res.Missing) < sampleCap {
+                       sortVersionsAsc(srcVs)
+                       missingCopy := append([]VersionWithPath(nil), srcVs...)
+                       res.Missing = append(res.Missing, AnalyzeMissingRow{
+                               SeriesID:        g.sid,
+                               Timestamp:       ts,
+                               SourceVersions:  srcVs,
+                               TargetVersions:  nil,
+                               MissingVersions: missingCopy,
+                       })
+               }
+               i = j
+       }
+}
+
+// diffSidGroup runs the ts-by-ts multiset subtraction for one
+// matching (sid). Both groups are already ts-sorted; walk them in
+// merge fashion.
+func diffSidGroup(srcGrp, tgtGrp *sidGroup, sampleCap int, res 
*AnalyzeDiffResult) {
+       si, ti := 0, 0
+       for si < len(srcGrp.rows) || ti < len(tgtGrp.rows) {
+               var cmpTS int
+               switch {
+               case si >= len(srcGrp.rows):
+                       cmpTS = +1
+               case ti >= len(tgtGrp.rows):
+                       cmpTS = -1
+               case srcGrp.rows[si].ts < tgtGrp.rows[ti].ts:
+                       cmpTS = -1
+               case srcGrp.rows[si].ts > tgtGrp.rows[ti].ts:
+                       cmpTS = +1
+               default:
+                       cmpTS = 0
+               }
+               switch {
+               case cmpTS < 0:
+                       ts := srcGrp.rows[si].ts
+                       start := si
+                       for si < len(srcGrp.rows) && srcGrp.rows[si].ts == ts {
+                               si++
+                       }
+                       srcVs := make([]VersionWithPath, 0, si-start)
+                       for k := start; k < si; k++ {
+                               srcVs = append(srcVs, VersionWithPath{Version: 
srcGrp.rows[k].version, Path: srcGrp.rows[k].partPath})
+                       }
+                       res.SourceRows += uint64(len(srcVs))
+                       res.MissingKeys++
+                       res.MissingRows += uint64(len(srcVs))
+                       if sampleCap == 0 || len(res.Missing) < sampleCap {
+                               sortVersionsAsc(srcVs)
+                               missingCopy := append([]VersionWithPath(nil), 
srcVs...)
+                               res.Missing = append(res.Missing, 
AnalyzeMissingRow{
+                                       SeriesID:        srcGrp.sid,
+                                       Timestamp:       ts,
+                                       SourceVersions:  srcVs,
+                                       TargetVersions:  nil,
+                                       MissingVersions: missingCopy,
+                               })
+                       }
+               case cmpTS > 0:
+                       ts := tgtGrp.rows[ti].ts
+                       for ti < len(tgtGrp.rows) && tgtGrp.rows[ti].ts == ts {
+                               res.TargetRows++
+                               ti++
+                       }
+               default:
+                       ts := srcGrp.rows[si].ts
+                       startSrc := si
+                       for si < len(srcGrp.rows) && srcGrp.rows[si].ts == ts {
+                               si++
+                       }
+                       startTgt := ti
+                       for ti < len(tgtGrp.rows) && tgtGrp.rows[ti].ts == ts {
+                               ti++
+                       }
+                       srcVs := make([]VersionWithPath, 0, si-startSrc)
+                       for k := startSrc; k < si; k++ {
+                               srcVs = append(srcVs, VersionWithPath{Version: 
srcGrp.rows[k].version, Path: srcGrp.rows[k].partPath})
+                       }
+                       tgtVs := make([]VersionWithPath, 0, ti-startTgt)
+                       for k := startTgt; k < ti; k++ {
+                               tgtVs = append(tgtVs, VersionWithPath{Version: 
tgtGrp.rows[k].version, Path: tgtGrp.rows[k].partPath})
+                       }
+                       res.SourceRows += uint64(len(srcVs))
+                       res.TargetRows += uint64(len(tgtVs))
+                       if len(tgtVs) >= len(srcVs) {
+                               continue
+                       }
+                       tgtCount := make(map[int64]int, len(tgtVs))
+                       for _, t := range tgtVs {
+                               tgtCount[t.Version]++
+                       }
+                       var missing []VersionWithPath
+                       for _, s := range srcVs {
+                               if tgtCount[s.Version] > 0 {
+                                       tgtCount[s.Version]--
+                                       continue
+                               }
+                               missing = append(missing, s)
+                       }
+                       if len(missing) == 0 {
+                               continue
+                       }
+                       res.MissingKeys++
+                       res.MissingRows += uint64(len(missing))
+                       if sampleCap == 0 || len(res.Missing) < sampleCap {
+                               sortVersionsAsc(srcVs)
+                               sortVersionsAsc(tgtVs)
+                               sortVersionsAsc(missing)
+                               res.Missing = append(res.Missing, 
AnalyzeMissingRow{
+                                       SeriesID:        srcGrp.sid,
+                                       Timestamp:       ts,
+                                       SourceVersions:  srcVs,
+                                       TargetVersions:  tgtVs,
+                                       MissingVersions: missing,
+                               })
+                       }
+               }
+       }
+}
+
+// sidGroup is one seriesID's full row set (across all parts), with
+// rows sorted by timestamp ascending. Built by sidGroupStream.
+type sidGroup struct {
+       rows []sidRow
+       sid  common.SeriesID
+}
+
+type sidRow struct {
+       partPath string
+       ts       int64
+       version  int64
+}
+
+func sortVersionsAsc(vs []VersionWithPath) {
+       sort.Slice(vs, func(i, j int) bool { return vs[i].Version < 
vs[j].Version })
+}
+
+// openAllPartsInRoots enumerates every part dir under the given roots,
+// opens each via mustOpenFilePart, and returns the *part slice + a
+// partID → full path map. Caller must closeAllParts when done.
+func openAllPartsInRoots(roots []string, fileSystem fs.FileSystem) ([]*part, 
map[uint64]string, error) {
+       var parts []*part
+       pathByID := make(map[uint64]string, 64)
+       for _, root := range roots {
+               segEntries, err := os.ReadDir(root)
+               if err != nil {
+                       if os.IsNotExist(err) {
+                               continue
+                       }
+                       closeAllParts(parts)
+                       return nil, nil, fmt.Errorf("read root %s: %w", root, 
err)
+               }

Review Comment:
   openAllPartsInRoots (and other analyzers) accept an fs.FileSystem but still 
use os.ReadDir/path walking directly. This couples the analyzer to the local OS 
filesystem and makes the fs.FileSystem argument misleading; consider using 
fileSystem.ReadDir/fileSystem.IsExist equivalents throughout the traversal, or 
remove the parameter to clarify expectations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to