Copilot commented on code in PR #1187:
URL:
https://github.com/apache/skywalking-banyandb/pull/1187#discussion_r3456739057
##########
banyand/cmd/migration/analyze.go:
##########
@@ -212,6 +220,73 @@ func runMeasureAnalyze(entries []migration.ResolvedEntry,
entry migration.Resolv
return nil
}
+// runIndexModeAnalyze runs the index-mode-flavored analyze logic: it reads the
+// group's sidx documents (not part block metadata) and reports total docs,
+// distinct (series, timestamp) keys, version duplicates and value conflicts —
+// a (series, timestamp) appearing in >1 doc with differing value digests.
+func runIndexModeAnalyze(schemaRoot string, entries []migration.ResolvedEntry,
entry migration.ResolvedEntry,
+ entryIdx int, srcRoots []string, groupName string, sampleCap int,
+) error {
+ fmt.Printf("== analyze entry [%d/%d] stage=%s nodes=%v group=%s
(index-mode) ==\n",
+ entryIdx+1, len(entries), entry.Stage, entry.Nodes, groupName)
+ for _, r := range srcRoots {
+ fmt.Printf(" src: %s\n", r)
+ }
+ start := time.Now()
+ res, err := measure.AnalyzeIndexModeGroup(context.Background(),
schemaRoot, groupName, srcRoots, sampleCap)
+ if err != nil {
+ return err
+ }
+ elapsed := time.Since(start)
+
+ fmt.Printf(" sidx dirs scanned : %d\n", res.PartsScanned)
+ fmt.Printf(" total docs on disk : %d\n", res.TotalRows)
+ fmt.Printf(" distinct (sid, ts) : %d\n", res.UniqueKeys)
+ fmt.Printf(" version-duplicate docs : %d (same (sid, ts) seen
in >1 doc)\n", res.DuplicateRows)
+ fmt.Printf(" (sid, ts) keys with >1 doc : %d\n",
res.KeysWithDuplicates)
+ fmt.Printf(" VALUE-CONFLICT keys : %d (same (sid, ts),
differing value digest)\n", res.ValueConflictKeys)
+ fmt.Printf(" elapsed : %v\n", elapsed)
+ if res.KeysWithDuplicates > 0 {
+ shown := len(res.SamplesByVersion)
+ fmt.Printf(" sample (sid, ts) with >1 doc (showing %d / %d
keys):\n", shown, res.KeysWithDuplicates)
+ for _, s := range res.SamplesByVersion {
+ ts := time.Unix(0,
s.Timestamp).UTC().Format(time.RFC3339)
+ fmt.Printf(" sid=%d ts=%s\n", s.SeriesID, ts)
+ for _, v := range s.Versions {
+ fmt.Printf(" version=%d\n", v.Version)
+ }
+ }
+ if uint64(shown) < res.KeysWithDuplicates {
+ fmt.Printf(" ... %d more (raise --sample to see
them)\n", res.KeysWithDuplicates-uint64(shown))
+ }
+ }
+ if res.ValueConflictKeys > 0 {
+ shown := len(res.ValueConflicts)
+ fmt.Printf(" VALUE CONFLICTS (showing %d / %d keys; same (sid,
ts) but differing data values):\n",
+ shown, res.ValueConflictKeys)
+ for _, c := range res.ValueConflicts {
+ ts := time.Unix(0,
c.Timestamp).UTC().Format(time.RFC3339)
+ fmt.Printf(" sid=%d ts=%s\n", c.SeriesID, ts)
+ for i := range c.Versions {
+ fmt.Printf(" version=%d digest=%016x\n",
c.Versions[i], c.Digests[i])
+ }
+ }
+ if uint64(shown) < res.ValueConflictKeys {
+ fmt.Printf(" ... %d more (raise --sample to see
them)\n", res.ValueConflictKeys-uint64(shown))
+ }
+ }
+ fmt.Println()
+ fmt.Println("Interpretation:")
+ fmt.Println(" - index-mode data lives in the segment sidx, one
upserted doc per (series, timestamp).")
+ fmt.Println(" - version-duplicate docs are normal across source
segments (target keeps max version).")
+ fmt.Println(" - VALUE-CONFLICT keys are (sid, ts) whose data values
differ between docs. For metadata")
Review Comment:
This user-facing analyze output is inaccurate: index-mode data is
stored/upserted per (series, segment), not per (series, timestamp). The rest of
the command discusses (sid, ts) keys for duplicate/conflict reporting, so this
line is likely to mislead operators.
##########
banyand/measure/migration_indexmode_copy.go:
##########
@@ -0,0 +1,1112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "context"
+ "encoding/binary"
+ "fmt"
+ "hash/fnv"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+
+ "github.com/blugelabs/bluge"
+ blugesearch "github.com/blugelabs/bluge/search"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/migration"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// Reserved bluge stored-field names written by the inverted store's toDoc
+// (pkg/index/inverted/inverted.go:62-65). They are not tags and must be
+// handled explicitly rather than treated as tag fields.
+const (
+ imDocIDField = "_id"
+ imTimestampField = "_timestamp"
+ imVersionField = "_version"
+)
+
+// classifyGroup decides how to route a measure group. The internal
+// _top_n_result (TopNSchemaName) measure is auto-created by BanyanDB for every
+// measure group, so it never counts as a "real" normal measure. A group with
+// at least one index-mode measure and no real normal measure is an index-mode
+// group (isIndexMode=true); a group with no index-mode measure stays on the
+// existing normal path (false, nil); a group that mixes index-mode measures
+// with real normal measures is unsupported and returns an error listing the
+// offending normal measure names.
+func classifyGroup(group string, schemas map[string]*measureSchemaInfo)
(isIndexMode bool, err error) {
+ var indexModes, realNormals []string
+ for name, m := range schemas {
+ if name == TopNSchemaName {
+ continue
+ }
+ if m.IndexMode {
+ indexModes = append(indexModes, name)
+ } else {
+ realNormals = append(realNormals, name)
+ }
+ }
+ if len(indexModes) == 0 {
+ return false, nil
+ }
+ if len(realNormals) > 0 {
+ sort.Strings(realNormals)
+ return false, fmt.Errorf("group %q mixes index-mode measures
with non-_top_n_result normal measure(s) %v; "+
+ "mixed groups are unsupported", group, realNormals)
+ }
+ return true, nil
+}
+
+// collectTagNames gathers every tag name across the group's measures. A stored
+// field whose name is a known tag name is a non-indexed tag; this is what lets
+// classifyStoredField tell a 4-CHARACTER tag name apart from a 4-byte
IndexRuleID
+// (both are 4 bytes wide, so name length alone is ambiguous).
+func collectTagNames(schemasBySubject map[string]*measureSchemaInfo)
map[string]struct{} {
+ out := map[string]struct{}{}
+ for _, sc := range schemasBySubject {
+ if sc == nil {
+ continue
+ }
+ for _, tf := range sc.TagFamilies {
+ for _, tag := range tf.Tags {
+ out[tag] = struct{}{}
+ }
+ }
+ }
+ return out
+}
+
+// classifyStoredField reverses index.FieldKey.Marshal (pkg/index/index.go:73)
for
+// one stored field name. A name that matches a schema tag name is a
non-indexed
+// tag (TagName). Otherwise the name is a marshaled 4-byte IndexRuleID — an
indexed
+// field — so it is kept INDEXED with that rule id; when the rule is missing
from
+// ruleByID the field stays indexed (Analyzer/NoSort fall back to defaults)
and the
+// id is returned as missingRuleID so the caller can warn rather than silently
+// dropping its searchability. A name that is neither a known tag nor 4 bytes
wide
+// is unexpected; it is kept as a TagName best-effort.
+func classifyStoredField(name string, tagNames map[string]struct{}, ruleByID
map[uint32]indexRuleInfo) (
+ key index.FieldKey, indexed bool, missingRuleID uint32,
+) {
+ if _, isTag := tagNames[name]; isTag {
+ return index.FieldKey{TagName: name}, false, 0
+ }
+ if len(name) == 4 {
+ id := convert.BytesToUint32([]byte(name))
+ if _, ok := ruleByID[id]; ok {
+ return index.FieldKey{IndexRuleID: id}, true, 0
+ }
+ return index.FieldKey{IndexRuleID: id}, true, id
+ }
+ return index.FieldKey{TagName: name}, false, 0
+}
+
+// readIndexModeDocs scans every committed bluge doc under sidxDir and rebuilds
+// each as an index.Document from TWO sources:
+//
+// A) stored fields -> regular tag fields (+ timestamp/version), and
+// B) the doc _id unmarshaled to a pbv1.Series -> regenerate the index-only
+// _im_name / _im_entity_tag_* fields, which the write path never
stores.
+//
+// ruleByID restores Analyzer/NoSort on indexed fields; schemasBySubject maps
+// each series' Subject to its measure schema so the entity-derived fields are
+// regenerated with the right tag names/types and skip conditions.
+func readIndexModeDocs(ctx context.Context, sidxDir string, ruleByID
map[uint32]indexRuleInfo,
+ schemasBySubject map[string]*measureSchemaInfo,
+) ([]index.Document, error) {
+ r, err := bluge.OpenReader(bluge.DefaultConfig(sidxDir))
+ if err != nil {
+ // A sidx directory can exist on disk with no committed bluge
snapshot
+ // (a fresh segment whose writer never received a doc). Treat
that as an
+ // empty sidx instead of failing the whole copy.
+ if strings.Contains(err.Error(), "unable to find a usable
snapshot") {
+ return nil, nil
+ }
+ return nil, fmt.Errorf("open sidx reader %s: %w", sidxDir, err)
+ }
+ defer func() { _ = r.Close() }()
+ dmi, err := r.Search(ctx, bluge.NewAllMatches(bluge.NewMatchAllQuery()))
+ if err != nil {
+ return nil, fmt.Errorf("search sidx %s: %w", sidxDir, err)
+ }
+ var out []index.Document
+ var timeless []uint64
+ tagNames := collectTagNames(schemasBySubject)
+ missingRules := map[uint32]int{}
+ for {
+ match, nextErr := dmi.Next()
+ if nextErr != nil {
+ return nil, fmt.Errorf("iterate sidx %s: %w", sidxDir,
nextErr)
+ }
+ if match == nil {
+ break
+ }
+ doc, buildErr := rebuildOneDoc(match, ruleByID,
schemasBySubject, tagNames, missingRules)
+ if buildErr != nil {
+ return nil, fmt.Errorf("rebuild doc in %s: %w",
sidxDir, buildErr)
+ }
+ // The write path always stamps a checked, non-zero _timestamp
on every
+ // index-mode doc (write_standalone.go), so Timestamp==0 here
means the
+ // stored _timestamp is missing or undecodable — corrupt or
unexpected
+ // source data. Routing such a doc by a guessed segment
boundary would
+ // silently misplace it, so collect the offenders (without
retaining the
+ // corrupt docs) and surface them once the whole sidx has been
scanned.
+ if doc.Timestamp == 0 {
+ timeless = append(timeless, doc.DocID)
+ continue
+ }
+ out = append(out, doc)
+ }
+ // Surface corrupt data before anything else: a missing-rule warning
implies
+ // "continuing", which would be misleading when the read is about to
abort.
+ if len(timeless) > 0 {
+ sample := timeless
+ if len(sample) > 10 {
+ sample = sample[:10]
+ }
+ return nil, fmt.Errorf("sidx %s: %d index-mode doc(s) have a
missing or undecodable _timestamp (ts==0), "+
+ "which never occurs for valid data; the source is
corrupt or unexpected — sample series IDs (first %d) %v",
+ sidxDir, len(timeless), len(sample), sample)
+ }
+ warnMissingRules(sidxDir, missingRules)
+ return out, nil
+}
+
+// warnMissingRules logs a single warning enumerating every IndexRuleID that an
+// indexed stored field decoded to but that is NOT registered in the schema-
+// property catalog, with its occurrence count. The fields are still kept
indexed
+// (their Analyzer/NoSort fall back to defaults), but a non-empty list means
the
+// catalog is missing index rules the data was written with, so the operator
should
+// know rather than have a possibly-searchable field silently degraded.
+func warnMissingRules(sidxDir string, missingRules map[uint32]int) {
+ if len(missingRules) == 0 {
+ return
+ }
+ ids := make([]string, 0, len(missingRules))
+ total := 0
+ for id, count := range missingRules {
+ ids = append(ids, fmt.Sprintf("%d×%d", id, count))
+ total += count
+ }
+ sort.Strings(ids)
+ logger.GetLogger("measure-migration").Warn().
+ Str("sidx", sidxDir).
+ Int("distinctRules", len(missingRules)).
+ Int("totalFields", total).
+ Strs("ruleIDxCount", ids).
+ Msg("index-mode rebuild: indexed field(s) decode to an
IndexRuleID missing from the schema-property " +
+ "catalog; kept indexed with default Analyzer/NoSort —
restore the missing index rule(s) for exact fidelity")
+}
+
+// rebuildOneDoc reconstructs a single index.Document from a bluge match,
+// combining the two sources documented on readIndexModeDocs. tagNames is the
+// group's known tag-name set (to tell a tag from a rule id). missingRules,
when
+// non-nil, accumulates (rule-id -> count) for fields that decode to an
+// IndexRuleID NOT present in ruleByID, so the caller can warn.
+func rebuildOneDoc(match *blugesearch.DocumentMatch, ruleByID
map[uint32]indexRuleInfo,
+ schemasBySubject map[string]*measureSchemaInfo, tagNames
map[string]struct{}, missingRules map[uint32]int,
+) (index.Document, error) {
+ var entityValues []byte
+ var ts, version int64
+ var fields []index.Field
+ visitErr := match.VisitStoredFields(func(name string, value []byte)
bool {
+ switch name {
+ case imDocIDField:
+ entityValues = append([]byte(nil), value...)
+ case imTimestampField:
+ if dt, decErr := bluge.DecodeDateTime(value); decErr ==
nil {
+ ts = dt.UnixNano()
+ }
+ case imVersionField:
+ version = convert.BytesToInt64(value)
+ default:
+ key, indexed, missingRuleID :=
classifyStoredField(name, tagNames, ruleByID)
+ // NewBytesField clones value internally, so no extra
copy of the
+ // reusable bluge visit buffer is needed here.
+ f := index.NewBytesField(key, value)
+ f.Store = true
+ f.Index = indexed
+ if ri, ok := ruleByID[key.IndexRuleID]; indexed && ok {
+ f.NoSort = ri.NoSort
+ f.Key.Analyzer = ri.Analyzer
+ }
+ // A field that decodes to an IndexRuleID the catalog
does not know: it
+ // stays indexed (defaults), but the operator must be
told rather than
+ // have a possibly-searchable field silently degraded.
+ if missingRuleID != 0 && missingRules != nil {
+ missingRules[missingRuleID]++
+ }
+ fields = append(fields, f)
+ }
+ return true
+ })
+ if visitErr != nil {
+ return index.Document{}, fmt.Errorf("visit stored fields: %w",
visitErr)
+ }
+ // Source B: rebuild the index-only entity-derived fields from _id.
+ var series pbv1.Series
+ if err := series.Unmarshal(entityValues); err != nil {
+ return index.Document{}, fmt.Errorf("unmarshal series from _id:
%w", err)
+ }
+ fields = appendRegeneratedEntityFields(fields, &series,
schemasBySubject[series.Subject])
+ return index.Document{
+ Fields: fields,
+ EntityValues: entityValues,
+ Timestamp: ts,
+ DocID: uint64(series.ID),
+ Version: version,
+ }, nil
+}
+
+// appendRegeneratedEntityFields rebuilds the index-only fields the production
+// write path adds in appendEntityTagsToIndexFields (write_standalone.go:410):
+// _im_name (subject) and _im_entity_tag_<tag> (entity values). These are never
+// stored, so they must be regenerated from the series, or entity-tag /
measure-
+// name search breaks after migration.
+//
+// The entity-tag fields replicate the production skip condition: an entity tag
+// already indexed by an index rule is not re-emitted as _im_entity_tag_*.
+func appendRegeneratedEntityFields(fields []index.Field, series *pbv1.Series,
schema *measureSchemaInfo) []index.Field {
+ subj := index.NewStringField(index.FieldKey{TagName:
index.IndexModeName}, series.Subject)
+ subj.Index = true
+ subj.NoSort = true
+ fields = append(fields, subj)
+ if schema == nil {
+ return fields
+ }
+ for i, tagName := range schema.EntityTagNames {
+ if i >= len(series.EntityValues) {
+ break
+ }
+ if _, indexed := schema.IndexedEntityTags[tagName]; indexed {
+ continue
+ }
+ nv := encodeTagValue(tagName, schema.TagType[tagName],
series.EntityValues[i])
+ value := nv.value
+ releaseNameValue(nv)
+ if value == nil {
+ continue
+ }
+ f := index.NewBytesField(index.FieldKey{TagName:
index.IndexModeEntityTagPrefix + tagName}, value)
+ f.Index = true
+ f.NoSort = true
+ fields = append(fields, f)
+ }
+ return fields
+}
+
+// ── Target idx store pool
────────────────────────────────────────────────────.
+
+// targetIdxStore lazily opens one inverted store per target sidx path
+// (BatchWaitSec:0) and closes them all at the end, so multiple source segs
+// feeding the same target sidx share a single writer. Copied (cross-package,
+// unexported) from stream/migration_element_index.go:126-158.
+type targetIdxStore struct {
+ stores map[string]index.SeriesStore
+}
+
+func newTargetIdxStore() *targetIdxStore {
+ return &targetIdxStore{stores: map[string]index.SeriesStore{}}
+}
+
+func (t *targetIdxStore) get(path string) (index.SeriesStore, error) {
+ if s, ok := t.stores[path]; ok {
+ return s, nil
+ }
+ if err := os.MkdirAll(path, storage.DirPerm); err != nil {
+ return nil, fmt.Errorf("mkdir idx %s: %w", path, err)
+ }
+ s, err := inverted.NewStore(inverted.StoreOpts{Path: path,
BatchWaitSec: 0})
+ if err != nil {
+ return nil, fmt.Errorf("open idx store %s: %w", path, err)
+ }
+ t.stores[path] = s
+ return s, nil
+}
+
+func (t *targetIdxStore) closeAll() error {
+ var firstErr error
+ for path, s := range t.stores {
+ if err := s.Close(); err != nil && firstErr == nil {
+ firstErr = fmt.Errorf("close idx store %s: %w", path,
err)
+ }
+ }
+ t.stores = map[string]index.SeriesStore{}
+ return firstErr
+}
+
+// ── Segment routing
──────────────────────────────────────────────────────────.
+
+// ── Slow path: rebuild + route + merge keeping max version
───────────────────.
+
+// copyIndexModeSlowDocs routes already-read docs (docsByDir, keyed by source
+// sidx dir) by their _timestamp to the aligned target seg's sidx, deduplicates
+// per series (DocID) keeping the highest Version within each target seg, and
+// upserts the survivors via UpdateSeriesBatch. The target seg's
<segDir>/metadata
+// is written too, since a segment without it is treated as invalid and removed
+// at startup (storage/segment.go open loop). Routing all slow-path sources
+// through a single call is what makes the max-version dedup work ACROSS
sources
+// that merge into the same target seg. Returns the number
+// of rows written.
+func copyIndexModeSlowDocs(ctx context.Context, srcSidxDirs []string,
docsByDir map[string][]index.Document,
+ dstGroupRoot string, ir storage.IntervalRule, stores *targetIdxStore,
+) (rows int64, err error) {
+ // per target sidx path -> seriesID(DocID) -> survivor doc (max
version).
+ perTarget := map[string]map[uint64]index.Document{}
+ // targetSegName per sidx path, to write the seg metadata once flushed.
+ segNameByPath := map[string]string{}
+ var segCache alignedSegCache
+ for _, dir := range srcSidxDirs {
+ docs := docsByDir[dir]
+ for di := range docs {
+ d := docs[di]
+ seg := segCache.segNameFor(ir, d.Timestamp)
+ dstSidx := filepath.Join(dstGroupRoot, seg,
directCopySidxDirName)
+ segNameByPath[dstSidx] = seg
+ m := perTarget[dstSidx]
+ if m == nil {
+ m = map[uint64]index.Document{}
+ perTarget[dstSidx] = m
+ }
+ if prev, ok := m[d.DocID]; !ok || d.Version >=
prev.Version {
+ m[d.DocID] = d
+ }
+ }
+ }
+ for dstSidx, m := range perTarget {
+ if ctx.Err() != nil {
+ return rows, ctx.Err()
+ }
+ store, getErr := stores.get(dstSidx)
+ if getErr != nil {
+ return rows, getErr
+ }
+ batch := index.Batch{Documents: make(index.Documents, 0,
len(m))}
+ for _, d := range m {
+ batch.Documents = append(batch.Documents, d)
+ rows++
+ }
+ if updateErr := store.UpdateSeriesBatch(batch); updateErr !=
nil {
+ return rows, fmt.Errorf("write target sidx %s: %w",
dstSidx, updateErr)
+ }
+ segDir := filepath.Join(dstGroupRoot, segNameByPath[dstSidx])
+ if _, metaErr := writeIndexModeSegMetadata(segDir, ir); metaErr
!= nil {
+ return rows, metaErr
+ }
+ }
+ return rows, nil
+}
+
+// sortedSegList renders a target-segment set as a sorted comma-separated
string
+// for deterministic per-source rebuild log lines.
+func sortedSegList(segs map[string]struct{}) string {
+ names := make([]string, 0, len(segs))
+ for s := range segs {
+ names = append(names, s)
+ }
+ sort.Strings(names)
+ return strings.Join(names, ",")
+}
+
+// ── Top-level orchestration
──────────────────────────────────────────────────.
+
+// copyIndexModeGroup copies one (entry, group) for an index-mode measure
group.
+// It rejects groups carrying shard parts (a non-_top_n_result normal measure
+// has data), then for each source seg sidx either byte-copies the whole sidx
+// directory (fast path: all docs align to one not-yet-written target seg) or
+// rebuilds and routes docs through the slow path. Each touched target segment
+// gets its <segDir>/metadata so the runtime loads it instead of pruning it.
+func copyIndexModeGroup(ctx context.Context, in migration.EntryGroupInput,
+ ruleByID map[uint32]indexRuleInfo, schemasBySubject
map[string]*measureSchemaInfo,
+) (migration.EntryGroupResult, error) {
+ var res migration.EntryGroupResult
+ hasPart, err := hasShardPart(ctx, in.SrcRoots)
Review Comment:
The deferred closeAll error-handling here will not work as intended because
copyIndexModeGroup does not use named return values. When you return `return
res, nil`, the returned error is already evaluated to nil before the defer
runs, so assigning to the local `err` inside the defer cannot propagate a
close/flush failure to the caller (risking silent partial sidx writes).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]