Copilot commented on code in PR #1187:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1187#discussion_r3456739057


##########
banyand/cmd/migration/analyze.go:
##########
@@ -212,6 +220,73 @@ func runMeasureAnalyze(entries []migration.ResolvedEntry, 
entry migration.Resolv
        return nil
 }
 
+// runIndexModeAnalyze runs the index-mode-flavored analyze logic: it reads the
+// group's sidx documents (not part block metadata) and reports total docs,
+// distinct (series, timestamp) keys, version duplicates and value conflicts —
+// a (series, timestamp) appearing in >1 doc with differing value digests.
+func runIndexModeAnalyze(schemaRoot string, entries []migration.ResolvedEntry, 
entry migration.ResolvedEntry,
+       entryIdx int, srcRoots []string, groupName string, sampleCap int,
+) error {
+       fmt.Printf("== analyze entry [%d/%d] stage=%s nodes=%v group=%s 
(index-mode) ==\n",
+               entryIdx+1, len(entries), entry.Stage, entry.Nodes, groupName)
+       for _, r := range srcRoots {
+               fmt.Printf("  src: %s\n", r)
+       }
+       start := time.Now()
+       res, err := measure.AnalyzeIndexModeGroup(context.Background(), 
schemaRoot, groupName, srcRoots, sampleCap)
+       if err != nil {
+               return err
+       }
+       elapsed := time.Since(start)
+
+       fmt.Printf("  sidx dirs scanned            : %d\n", res.PartsScanned)
+       fmt.Printf("  total docs on disk           : %d\n", res.TotalRows)
+       fmt.Printf("  distinct (sid, ts)           : %d\n", res.UniqueKeys)
+       fmt.Printf("  version-duplicate docs       : %d  (same (sid, ts) seen 
in >1 doc)\n", res.DuplicateRows)
+       fmt.Printf("  (sid, ts) keys with >1 doc   : %d\n", 
res.KeysWithDuplicates)
+       fmt.Printf("  VALUE-CONFLICT keys          : %d  (same (sid, ts), 
differing value digest)\n", res.ValueConflictKeys)
+       fmt.Printf("  elapsed                      : %v\n", elapsed)
+       if res.KeysWithDuplicates > 0 {
+               shown := len(res.SamplesByVersion)
+               fmt.Printf("  sample (sid, ts) with >1 doc (showing %d / %d 
keys):\n", shown, res.KeysWithDuplicates)
+               for _, s := range res.SamplesByVersion {
+                       ts := time.Unix(0, 
s.Timestamp).UTC().Format(time.RFC3339)
+                       fmt.Printf("    sid=%d ts=%s\n", s.SeriesID, ts)
+                       for _, v := range s.Versions {
+                               fmt.Printf("      version=%d\n", v.Version)
+                       }
+               }
+               if uint64(shown) < res.KeysWithDuplicates {
+                       fmt.Printf("    ... %d more (raise --sample to see 
them)\n", res.KeysWithDuplicates-uint64(shown))
+               }
+       }
+       if res.ValueConflictKeys > 0 {
+               shown := len(res.ValueConflicts)
+               fmt.Printf("  VALUE CONFLICTS (showing %d / %d keys; same (sid, 
ts) but differing data values):\n",
+                       shown, res.ValueConflictKeys)
+               for _, c := range res.ValueConflicts {
+                       ts := time.Unix(0, 
c.Timestamp).UTC().Format(time.RFC3339)
+                       fmt.Printf("    sid=%d ts=%s\n", c.SeriesID, ts)
+                       for i := range c.Versions {
+                               fmt.Printf("      version=%d digest=%016x\n", 
c.Versions[i], c.Digests[i])
+                       }
+               }
+               if uint64(shown) < res.ValueConflictKeys {
+                       fmt.Printf("    ... %d more (raise --sample to see 
them)\n", res.ValueConflictKeys-uint64(shown))
+               }
+       }
+       fmt.Println()
+       fmt.Println("Interpretation:")
+       fmt.Println("  - index-mode data lives in the segment sidx, one 
upserted doc per (series, timestamp).")
+       fmt.Println("  - version-duplicate docs are normal across source 
segments (target keeps max version).")
+       fmt.Println("  - VALUE-CONFLICT keys are (sid, ts) whose data values 
differ between docs. For metadata")

Review Comment:
   This user-facing analyze output is inaccurate: index-mode data is 
stored/upserted per (series, segment), not per (series, timestamp). The rest of 
the command discusses (sid, ts) keys for duplicate/conflict reporting, so this 
line is likely to mislead operators.



##########
banyand/measure/migration_indexmode_copy.go:
##########
@@ -0,0 +1,1112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "encoding/binary"
+       "fmt"
+       "hash/fnv"
+       "io"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+
+       "github.com/blugelabs/bluge"
+       blugesearch "github.com/blugelabs/bluge/search"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/migration"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// Reserved bluge stored-field names written by the inverted store's toDoc
+// (pkg/index/inverted/inverted.go:62-65). They are not tags and must be
+// handled explicitly rather than treated as tag fields.
+const (
+       imDocIDField     = "_id"
+       imTimestampField = "_timestamp"
+       imVersionField   = "_version"
+)
+
+// classifyGroup decides how to route a measure group. The internal
+// _top_n_result (TopNSchemaName) measure is auto-created by BanyanDB for every
+// measure group, so it never counts as a "real" normal measure. A group with
+// at least one index-mode measure and no real normal measure is an index-mode
+// group (isIndexMode=true); a group with no index-mode measure stays on the
+// existing normal path (false, nil); a group that mixes index-mode measures
+// with real normal measures is unsupported and returns an error listing the
+// offending normal measure names.
+func classifyGroup(group string, schemas map[string]*measureSchemaInfo) 
(isIndexMode bool, err error) {
+       var indexModes, realNormals []string
+       for name, m := range schemas {
+               if name == TopNSchemaName {
+                       continue
+               }
+               if m.IndexMode {
+                       indexModes = append(indexModes, name)
+               } else {
+                       realNormals = append(realNormals, name)
+               }
+       }
+       if len(indexModes) == 0 {
+               return false, nil
+       }
+       if len(realNormals) > 0 {
+               sort.Strings(realNormals)
+               return false, fmt.Errorf("group %q mixes index-mode measures 
with non-_top_n_result normal measure(s) %v; "+
+                       "mixed groups are unsupported", group, realNormals)
+       }
+       return true, nil
+}
+
+// collectTagNames gathers every tag name across the group's measures. A stored
+// field whose name is a known tag name is a non-indexed tag; this is what lets
+// classifyStoredField tell a 4-CHARACTER tag name apart from a 4-byte 
IndexRuleID
+// (both are 4 bytes wide, so name length alone is ambiguous).
+func collectTagNames(schemasBySubject map[string]*measureSchemaInfo) 
map[string]struct{} {
+       out := map[string]struct{}{}
+       for _, sc := range schemasBySubject {
+               if sc == nil {
+                       continue
+               }
+               for _, tf := range sc.TagFamilies {
+                       for _, tag := range tf.Tags {
+                               out[tag] = struct{}{}
+                       }
+               }
+       }
+       return out
+}
+
+// classifyStoredField reverses index.FieldKey.Marshal (pkg/index/index.go:73) 
for
+// one stored field name. A name that matches a schema tag name is a 
non-indexed
+// tag (TagName). Otherwise the name is a marshaled 4-byte IndexRuleID — an 
indexed
+// field — so it is kept INDEXED with that rule id; when the rule is missing 
from
+// ruleByID the field stays indexed (Analyzer/NoSort fall back to defaults) 
and the
+// id is returned as missingRuleID so the caller can warn rather than silently
+// dropping its searchability. A name that is neither a known tag nor 4 bytes 
wide
+// is unexpected; it is kept as a TagName best-effort.
+func classifyStoredField(name string, tagNames map[string]struct{}, ruleByID 
map[uint32]indexRuleInfo) (
+       key index.FieldKey, indexed bool, missingRuleID uint32,
+) {
+       if _, isTag := tagNames[name]; isTag {
+               return index.FieldKey{TagName: name}, false, 0
+       }
+       if len(name) == 4 {
+               id := convert.BytesToUint32([]byte(name))
+               if _, ok := ruleByID[id]; ok {
+                       return index.FieldKey{IndexRuleID: id}, true, 0
+               }
+               return index.FieldKey{IndexRuleID: id}, true, id
+       }
+       return index.FieldKey{TagName: name}, false, 0
+}
+
+// readIndexModeDocs scans every committed bluge doc under sidxDir and rebuilds
+// each as an index.Document from TWO sources:
+//
+//     A) stored fields -> regular tag fields (+ timestamp/version), and
+//     B) the doc _id unmarshaled to a pbv1.Series -> regenerate the index-only
+//        _im_name / _im_entity_tag_* fields, which the write path never 
stores.
+//
+// ruleByID restores Analyzer/NoSort on indexed fields; schemasBySubject maps
+// each series' Subject to its measure schema so the entity-derived fields are
+// regenerated with the right tag names/types and skip conditions.
+func readIndexModeDocs(ctx context.Context, sidxDir string, ruleByID 
map[uint32]indexRuleInfo,
+       schemasBySubject map[string]*measureSchemaInfo,
+) ([]index.Document, error) {
+       r, err := bluge.OpenReader(bluge.DefaultConfig(sidxDir))
+       if err != nil {
+               // A sidx directory can exist on disk with no committed bluge 
snapshot
+               // (a fresh segment whose writer never received a doc). Treat 
that as an
+               // empty sidx instead of failing the whole copy.
+               if strings.Contains(err.Error(), "unable to find a usable 
snapshot") {
+                       return nil, nil
+               }
+               return nil, fmt.Errorf("open sidx reader %s: %w", sidxDir, err)
+       }
+       defer func() { _ = r.Close() }()
+       dmi, err := r.Search(ctx, bluge.NewAllMatches(bluge.NewMatchAllQuery()))
+       if err != nil {
+               return nil, fmt.Errorf("search sidx %s: %w", sidxDir, err)
+       }
+       var out []index.Document
+       var timeless []uint64
+       tagNames := collectTagNames(schemasBySubject)
+       missingRules := map[uint32]int{}
+       for {
+               match, nextErr := dmi.Next()
+               if nextErr != nil {
+                       return nil, fmt.Errorf("iterate sidx %s: %w", sidxDir, 
nextErr)
+               }
+               if match == nil {
+                       break
+               }
+               doc, buildErr := rebuildOneDoc(match, ruleByID, 
schemasBySubject, tagNames, missingRules)
+               if buildErr != nil {
+                       return nil, fmt.Errorf("rebuild doc in %s: %w", 
sidxDir, buildErr)
+               }
+               // The write path always stamps a checked, non-zero _timestamp 
on every
+               // index-mode doc (write_standalone.go), so Timestamp==0 here 
means the
+               // stored _timestamp is missing or undecodable — corrupt or 
unexpected
+               // source data. Routing such a doc by a guessed segment 
boundary would
+               // silently misplace it, so collect the offenders (without 
retaining the
+               // corrupt docs) and surface them once the whole sidx has been 
scanned.
+               if doc.Timestamp == 0 {
+                       timeless = append(timeless, doc.DocID)
+                       continue
+               }
+               out = append(out, doc)
+       }
+       // Surface corrupt data before anything else: a missing-rule warning 
implies
+       // "continuing", which would be misleading when the read is about to 
abort.
+       if len(timeless) > 0 {
+               sample := timeless
+               if len(sample) > 10 {
+                       sample = sample[:10]
+               }
+               return nil, fmt.Errorf("sidx %s: %d index-mode doc(s) have a 
missing or undecodable _timestamp (ts==0), "+
+                       "which never occurs for valid data; the source is 
corrupt or unexpected — sample series IDs (first %d) %v",
+                       sidxDir, len(timeless), len(sample), sample)
+       }
+       warnMissingRules(sidxDir, missingRules)
+       return out, nil
+}
+
+// warnMissingRules logs a single warning enumerating every IndexRuleID that an
+// indexed stored field decoded to but that is NOT registered in the schema-
+// property catalog, with its occurrence count. The fields are still kept 
indexed
+// (their Analyzer/NoSort fall back to defaults), but a non-empty list means 
the
+// catalog is missing index rules the data was written with, so the operator 
should
+// know rather than have a possibly-searchable field silently degraded.
+func warnMissingRules(sidxDir string, missingRules map[uint32]int) {
+       if len(missingRules) == 0 {
+               return
+       }
+       ids := make([]string, 0, len(missingRules))
+       total := 0
+       for id, count := range missingRules {
+               ids = append(ids, fmt.Sprintf("%d×%d", id, count))
+               total += count
+       }
+       sort.Strings(ids)
+       logger.GetLogger("measure-migration").Warn().
+               Str("sidx", sidxDir).
+               Int("distinctRules", len(missingRules)).
+               Int("totalFields", total).
+               Strs("ruleIDxCount", ids).
+               Msg("index-mode rebuild: indexed field(s) decode to an 
IndexRuleID missing from the schema-property " +
+                       "catalog; kept indexed with default Analyzer/NoSort — 
restore the missing index rule(s) for exact fidelity")
+}
+
+// rebuildOneDoc reconstructs a single index.Document from a bluge match,
+// combining the two sources documented on readIndexModeDocs. tagNames is the
+// group's known tag-name set (to tell a tag from a rule id). missingRules, 
when
+// non-nil, accumulates (rule-id -> count) for fields that decode to an
+// IndexRuleID NOT present in ruleByID, so the caller can warn.
+func rebuildOneDoc(match *blugesearch.DocumentMatch, ruleByID 
map[uint32]indexRuleInfo,
+       schemasBySubject map[string]*measureSchemaInfo, tagNames 
map[string]struct{}, missingRules map[uint32]int,
+) (index.Document, error) {
+       var entityValues []byte
+       var ts, version int64
+       var fields []index.Field
+       visitErr := match.VisitStoredFields(func(name string, value []byte) 
bool {
+               switch name {
+               case imDocIDField:
+                       entityValues = append([]byte(nil), value...)
+               case imTimestampField:
+                       if dt, decErr := bluge.DecodeDateTime(value); decErr == 
nil {
+                               ts = dt.UnixNano()
+                       }
+               case imVersionField:
+                       version = convert.BytesToInt64(value)
+               default:
+                       key, indexed, missingRuleID := 
classifyStoredField(name, tagNames, ruleByID)
+                       // NewBytesField clones value internally, so no extra 
copy of the
+                       // reusable bluge visit buffer is needed here.
+                       f := index.NewBytesField(key, value)
+                       f.Store = true
+                       f.Index = indexed
+                       if ri, ok := ruleByID[key.IndexRuleID]; indexed && ok {
+                               f.NoSort = ri.NoSort
+                               f.Key.Analyzer = ri.Analyzer
+                       }
+                       // A field that decodes to an IndexRuleID the catalog 
does not know: it
+                       // stays indexed (defaults), but the operator must be 
told rather than
+                       // have a possibly-searchable field silently degraded.
+                       if missingRuleID != 0 && missingRules != nil {
+                               missingRules[missingRuleID]++
+                       }
+                       fields = append(fields, f)
+               }
+               return true
+       })
+       if visitErr != nil {
+               return index.Document{}, fmt.Errorf("visit stored fields: %w", 
visitErr)
+       }
+       // Source B: rebuild the index-only entity-derived fields from _id.
+       var series pbv1.Series
+       if err := series.Unmarshal(entityValues); err != nil {
+               return index.Document{}, fmt.Errorf("unmarshal series from _id: 
%w", err)
+       }
+       fields = appendRegeneratedEntityFields(fields, &series, 
schemasBySubject[series.Subject])
+       return index.Document{
+               Fields:       fields,
+               EntityValues: entityValues,
+               Timestamp:    ts,
+               DocID:        uint64(series.ID),
+               Version:      version,
+       }, nil
+}
+
+// appendRegeneratedEntityFields rebuilds the index-only fields the production
+// write path adds in appendEntityTagsToIndexFields (write_standalone.go:410):
+// _im_name (subject) and _im_entity_tag_<tag> (entity values). These are never
+// stored, so they must be regenerated from the series, or entity-tag / 
measure-
+// name search breaks after migration.
+//
+// The entity-tag fields replicate the production skip condition: an entity tag
+// already indexed by an index rule is not re-emitted as _im_entity_tag_*.
+func appendRegeneratedEntityFields(fields []index.Field, series *pbv1.Series, 
schema *measureSchemaInfo) []index.Field {
+       subj := index.NewStringField(index.FieldKey{TagName: 
index.IndexModeName}, series.Subject)
+       subj.Index = true
+       subj.NoSort = true
+       fields = append(fields, subj)
+       if schema == nil {
+               return fields
+       }
+       for i, tagName := range schema.EntityTagNames {
+               if i >= len(series.EntityValues) {
+                       break
+               }
+               if _, indexed := schema.IndexedEntityTags[tagName]; indexed {
+                       continue
+               }
+               nv := encodeTagValue(tagName, schema.TagType[tagName], 
series.EntityValues[i])
+               value := nv.value
+               releaseNameValue(nv)
+               if value == nil {
+                       continue
+               }
+               f := index.NewBytesField(index.FieldKey{TagName: 
index.IndexModeEntityTagPrefix + tagName}, value)
+               f.Index = true
+               f.NoSort = true
+               fields = append(fields, f)
+       }
+       return fields
+}
+
+// ── Target idx store pool 
────────────────────────────────────────────────────.
+
+// targetIdxStore lazily opens one inverted store per target sidx path
+// (BatchWaitSec:0) and closes them all at the end, so multiple source segs
+// feeding the same target sidx share a single writer. Copied (cross-package,
+// unexported) from stream/migration_element_index.go:126-158.
+type targetIdxStore struct {
+       stores map[string]index.SeriesStore
+}
+
+func newTargetIdxStore() *targetIdxStore {
+       return &targetIdxStore{stores: map[string]index.SeriesStore{}}
+}
+
+func (t *targetIdxStore) get(path string) (index.SeriesStore, error) {
+       if s, ok := t.stores[path]; ok {
+               return s, nil
+       }
+       if err := os.MkdirAll(path, storage.DirPerm); err != nil {
+               return nil, fmt.Errorf("mkdir idx %s: %w", path, err)
+       }
+       s, err := inverted.NewStore(inverted.StoreOpts{Path: path, 
BatchWaitSec: 0})
+       if err != nil {
+               return nil, fmt.Errorf("open idx store %s: %w", path, err)
+       }
+       t.stores[path] = s
+       return s, nil
+}
+
+func (t *targetIdxStore) closeAll() error {
+       var firstErr error
+       for path, s := range t.stores {
+               if err := s.Close(); err != nil && firstErr == nil {
+                       firstErr = fmt.Errorf("close idx store %s: %w", path, 
err)
+               }
+       }
+       t.stores = map[string]index.SeriesStore{}
+       return firstErr
+}
+
+// ── Segment routing 
──────────────────────────────────────────────────────────.
+
+// ── Slow path: rebuild + route + merge keeping max version 
───────────────────.
+
+// copyIndexModeSlowDocs routes already-read docs (docsByDir, keyed by source
+// sidx dir) by their _timestamp to the aligned target seg's sidx, deduplicates
+// per series (DocID) keeping the highest Version within each target seg, and
+// upserts the survivors via UpdateSeriesBatch. The target seg's 
<segDir>/metadata
+// is written too, since a segment without it is treated as invalid and removed
+// at startup (storage/segment.go open loop). Routing all slow-path sources
+// through a single call is what makes the max-version dedup work ACROSS 
sources
+// that merge into the same target seg. Returns the number
+// of rows written.
+func copyIndexModeSlowDocs(ctx context.Context, srcSidxDirs []string, 
docsByDir map[string][]index.Document,
+       dstGroupRoot string, ir storage.IntervalRule, stores *targetIdxStore,
+) (rows int64, err error) {
+       // per target sidx path -> seriesID(DocID) -> survivor doc (max 
version).
+       perTarget := map[string]map[uint64]index.Document{}
+       // targetSegName per sidx path, to write the seg metadata once flushed.
+       segNameByPath := map[string]string{}
+       var segCache alignedSegCache
+       for _, dir := range srcSidxDirs {
+               docs := docsByDir[dir]
+               for di := range docs {
+                       d := docs[di]
+                       seg := segCache.segNameFor(ir, d.Timestamp)
+                       dstSidx := filepath.Join(dstGroupRoot, seg, 
directCopySidxDirName)
+                       segNameByPath[dstSidx] = seg
+                       m := perTarget[dstSidx]
+                       if m == nil {
+                               m = map[uint64]index.Document{}
+                               perTarget[dstSidx] = m
+                       }
+                       if prev, ok := m[d.DocID]; !ok || d.Version >= 
prev.Version {
+                               m[d.DocID] = d
+                       }
+               }
+       }
+       for dstSidx, m := range perTarget {
+               if ctx.Err() != nil {
+                       return rows, ctx.Err()
+               }
+               store, getErr := stores.get(dstSidx)
+               if getErr != nil {
+                       return rows, getErr
+               }
+               batch := index.Batch{Documents: make(index.Documents, 0, 
len(m))}
+               for _, d := range m {
+                       batch.Documents = append(batch.Documents, d)
+                       rows++
+               }
+               if updateErr := store.UpdateSeriesBatch(batch); updateErr != 
nil {
+                       return rows, fmt.Errorf("write target sidx %s: %w", 
dstSidx, updateErr)
+               }
+               segDir := filepath.Join(dstGroupRoot, segNameByPath[dstSidx])
+               if _, metaErr := writeIndexModeSegMetadata(segDir, ir); metaErr 
!= nil {
+                       return rows, metaErr
+               }
+       }
+       return rows, nil
+}
+
+// sortedSegList renders a target-segment set as a sorted comma-separated 
string
+// for deterministic per-source rebuild log lines.
+func sortedSegList(segs map[string]struct{}) string {
+       names := make([]string, 0, len(segs))
+       for s := range segs {
+               names = append(names, s)
+       }
+       sort.Strings(names)
+       return strings.Join(names, ",")
+}
+
+// ── Top-level orchestration 
──────────────────────────────────────────────────.
+
+// copyIndexModeGroup copies one (entry, group) for an index-mode measure 
group.
+// It rejects groups carrying shard parts (a non-_top_n_result normal measure
+// has data), then for each source seg sidx either byte-copies the whole sidx
+// directory (fast path: all docs align to one not-yet-written target seg) or
+// rebuilds and routes docs through the slow path. Each touched target segment
+// gets its <segDir>/metadata so the runtime loads it instead of pruning it.
+func copyIndexModeGroup(ctx context.Context, in migration.EntryGroupInput,
+       ruleByID map[uint32]indexRuleInfo, schemasBySubject 
map[string]*measureSchemaInfo,
+) (migration.EntryGroupResult, error) {
+       var res migration.EntryGroupResult
+       hasPart, err := hasShardPart(ctx, in.SrcRoots)

Review Comment:
   The deferred closeAll error-handling here will not work as intended because 
copyIndexModeGroup does not use named return values. When you return `return 
res, nil`, the returned error is already evaluated to nil before the defer 
runs, so assigning to the local `err` inside the defer cannot propagate a 
close/flush failure to the caller (risking silent partial sidx writes).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to